diff --git a/pkg/controlplane/instance.go b/pkg/controlplane/instance.go index e31f62089b5..aefa2ad077d 100644 --- a/pkg/controlplane/instance.go +++ b/pkg/controlplane/instance.go @@ -433,7 +433,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer}, schedulingrest.RESTStorageProvider{}, storagerest.RESTStorageProvider{}, - flowcontrolrest.RESTStorageProvider{}, + flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory}, // keep apps after extensions so legacy clients resolve the extensions versions of shared resource names. // See https://github.com/kubernetes/kubernetes/issues/42392 appsrest.StorageProvider{}, diff --git a/pkg/registry/flowcontrol/ensurer/flowschema.go b/pkg/registry/flowcontrol/ensurer/flowschema.go index 3e9515e248f..dbbfc5eb10d 100644 --- a/pkg/registry/flowcontrol/ensurer/flowschema.go +++ b/pkg/registry/flowcontrol/ensurer/flowschema.go @@ -24,9 +24,11 @@ import ( flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2" + flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2" flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2" ) @@ -46,9 +48,10 @@ type FlowSchemaRemover interface { // NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that // can be used to ensure a set of suggested FlowSchema configuration objects. -func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { +func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer { wrapper := &flowSchemaWrapper{ client: client, + lister: lister, } return &fsEnsurer{ strategy: newSuggestedEnsureStrategy(wrapper), @@ -58,9 +61,10 @@ func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) // NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that // can be used to ensure a set of mandatory FlowSchema configuration objects. -func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { +func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer { wrapper := &flowSchemaWrapper{ client: client, + lister: lister, } return &fsEnsurer{ strategy: newMandatoryEnsureStrategy(wrapper), @@ -70,10 +74,11 @@ func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) // NewFlowSchemaRemover returns a FlowSchemaRemover instance that // can be used to remove a set of FlowSchema configuration objects. -func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface) FlowSchemaRemover { +func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaRemover { return &fsEnsurer{ wrapper: &flowSchemaWrapper{ client: client, + lister: lister, }, } } @@ -82,9 +87,8 @@ func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface) FlowSche // names that are candidates for deletion from the cluster. // bootstrap: a set of hard coded FlowSchema configuration objects // kube-apiserver maintains in-memory. -func GetFlowSchemaRemoveCandidate(client flowcontrolclient.FlowSchemaInterface, bootstrap []*flowcontrolv1beta2.FlowSchema) ([]string, error) { - // TODO(101667): Use a lister here to avoid periodic LIST calls - fsList, err := client.List(context.TODO(), metav1.ListOptions{}) +func GetFlowSchemaRemoveCandidate(lister flowcontrollisters.FlowSchemaLister, bootstrap []*flowcontrolv1beta2.FlowSchema) ([]string, error) { + fsList, err := lister.List(labels.Everything()) if err != nil { return nil, fmt.Errorf("failed to list FlowSchema - %w", err) } @@ -94,9 +98,9 @@ func GetFlowSchemaRemoveCandidate(client flowcontrolclient.FlowSchemaInterface, bootstrapNames.Insert(bootstrap[i].GetName()) } - currentObjects := make([]metav1.Object, len(fsList.Items)) - for i := range fsList.Items { - currentObjects[i] = &fsList.Items[i] + currentObjects := make([]metav1.Object, len(fsList)) + for i := range fsList { + currentObjects[i] = fsList[i] } return getRemoveCandidate(bootstrapNames, currentObjects), nil @@ -131,6 +135,7 @@ func (e *fsEnsurer) Remove(flowSchemas []string) error { // we can manage all boiler plate code in one place. type flowSchemaWrapper struct { client flowcontrolclient.FlowSchemaInterface + lister flowcontrollisters.FlowSchemaLister } func (fs *flowSchemaWrapper) TypeName() string { @@ -156,7 +161,7 @@ func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, erro } func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) { - return fs.client.Get(context.TODO(), name, metav1.GetOptions{}) + return fs.lister.Get(name) } func (fs *flowSchemaWrapper) Delete(name string) error { diff --git a/pkg/registry/flowcontrol/ensurer/flowschema_test.go b/pkg/registry/flowcontrol/ensurer/flowschema_test.go index 48eca1cef54..ef2a82e2ca3 100644 --- a/pkg/registry/flowcontrol/ensurer/flowschema_test.go +++ b/pkg/registry/flowcontrol/ensurer/flowschema_test.go @@ -27,44 +27,41 @@ import ( "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/client-go/kubernetes/fake" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2" + flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2" + "k8s.io/client-go/tools/cache" flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" ) func TestEnsureFlowSchema(t *testing.T) { tests := []struct { name string - strategy func(flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer + strategy func(flowcontrolclient.FlowSchemaInterface, flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer current *flowcontrolv1beta2.FlowSchema bootstrap *flowcontrolv1beta2.FlowSchema expected *flowcontrolv1beta2.FlowSchema }{ // for suggested configurations { - name: "suggested flow schema does not exist - the object should always be re-created", - strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { - return NewSuggestedFlowSchemaEnsurer(client) - }, + name: "suggested flow schema does not exist - the object should always be re-created", + strategy: NewSuggestedFlowSchemaEnsurer, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: nil, expected: newFlowSchema("fs1", "pl1", 100).Object(), }, { - name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", - strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { - return NewSuggestedFlowSchemaEnsurer(client) - }, + name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", + strategy: NewSuggestedFlowSchemaEnsurer, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), }, { - name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", - strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { - return NewSuggestedFlowSchemaEnsurer(client) - }, + name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", + strategy: NewSuggestedFlowSchemaEnsurer, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), @@ -72,28 +69,22 @@ func TestEnsureFlowSchema(t *testing.T) { // for mandatory configurations { - name: "mandatory flow schema does not exist - new object should be created", - strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { - return NewMandatoryFlowSchemaEnsurer(client) - }, + name: "mandatory flow schema does not exist - new object should be created", + strategy: NewMandatoryFlowSchemaEnsurer, bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), current: nil, expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), }, { - name: "mandatory flow schema exists, annotation is missing - annotation should be added", - strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { - return NewMandatoryFlowSchemaEnsurer(client) - }, + name: "mandatory flow schema exists, annotation is missing - annotation should be added", + strategy: NewMandatoryFlowSchemaEnsurer, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 100).Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), }, { - name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated", - strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { - return NewMandatoryFlowSchemaEnsurer(client) - }, + name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated", + strategy: NewMandatoryFlowSchemaEnsurer, bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), @@ -103,11 +94,13 @@ func TestEnsureFlowSchema(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { client := fake.NewSimpleClientset().FlowcontrolV1beta2().FlowSchemas() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if test.current != nil { client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + indexer.Add(test.current) } - ensurer := test.strategy(client) + ensurer := test.strategy(client, flowcontrollisters.NewFlowSchemaLister(indexer)) err := ensurer.Ensure([]*flowcontrolv1beta2.FlowSchema{test.bootstrap}) if err != nil { @@ -322,11 +315,13 @@ func TestRemoveFlowSchema(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { client := fake.NewSimpleClientset().FlowcontrolV1beta2().FlowSchemas() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if test.current != nil { client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + indexer.Add(test.current) } - remover := NewFlowSchemaRemover(client) + remover := NewFlowSchemaRemover(client, flowcontrollisters.NewFlowSchemaLister(indexer)) err := remover.Remove([]string{test.bootstrapName}) if err != nil { t.Fatalf("Expected no error, but got: %v", err) @@ -409,17 +404,20 @@ func TestGetFlowSchemaRemoveCandidate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset().FlowcontrolV1beta2().FlowSchemas() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) for i := range test.current { - client.Create(context.TODO(), test.current[i], metav1.CreateOptions{}) + indexer.Add(test.current[i]) } - removeListGot, err := GetFlowSchemaRemoveCandidate(client, test.bootstrap) + lister := flowcontrollisters.NewFlowSchemaLister(indexer) + removeListGot, err := GetFlowSchemaRemoveCandidate(lister, test.bootstrap) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } - if !cmp.Equal(test.expected, removeListGot) { + if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool { + return a < b + })) { t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot)) } }) diff --git a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go index 479f09988c8..e75bf16eaef 100644 --- a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go +++ b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration.go @@ -24,9 +24,11 @@ import ( flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2" + flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2" flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2" ) @@ -46,9 +48,10 @@ type PriorityLevelRemover interface { // NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that // can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects. -func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { +func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer { wrapper := &priorityLevelConfigurationWrapper{ client: client, + lister: lister, } return &plEnsurer{ strategy: newSuggestedEnsureStrategy(wrapper), @@ -58,9 +61,10 @@ func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLe // NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that // can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects. -func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { +func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer { wrapper := &priorityLevelConfigurationWrapper{ client: client, + lister: lister, } return &plEnsurer{ strategy: newMandatoryEnsureStrategy(wrapper), @@ -70,10 +74,11 @@ func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConf // NewPriorityLevelRemover returns a PriorityLevelRemover instance that // can be used to remove a set of PriorityLevelConfiguration configuration objects. -func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelRemover { +func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelRemover { return &plEnsurer{ wrapper: &priorityLevelConfigurationWrapper{ client: client, + lister: lister, }, } } @@ -82,9 +87,8 @@ func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfiguration // names that are candidates for removal from the cluster. // bootstrap: a set of hard coded PriorityLevelConfiguration configuration // objects kube-apiserver maintains in-memory. -func GetPriorityLevelRemoveCandidate(client flowcontrolclient.PriorityLevelConfigurationInterface, bootstrap []*flowcontrolv1beta2.PriorityLevelConfiguration) ([]string, error) { - // TODO(101667): Use a lister here to avoid periodic LIST calls - plList, err := client.List(context.TODO(), metav1.ListOptions{}) +func GetPriorityLevelRemoveCandidate(lister flowcontrollisters.PriorityLevelConfigurationLister, bootstrap []*flowcontrolv1beta2.PriorityLevelConfiguration) ([]string, error) { + plList, err := lister.List(labels.Everything()) if err != nil { return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err) } @@ -94,9 +98,9 @@ func GetPriorityLevelRemoveCandidate(client flowcontrolclient.PriorityLevelConfi bootstrapNames.Insert(bootstrap[i].GetName()) } - currentObjects := make([]metav1.Object, len(plList.Items)) - for i := range plList.Items { - currentObjects[i] = &plList.Items[i] + currentObjects := make([]metav1.Object, len(plList)) + for i := range plList { + currentObjects[i] = plList[i] } return getRemoveCandidate(bootstrapNames, currentObjects), nil @@ -131,6 +135,7 @@ func (e *plEnsurer) Remove(priorityLevels []string) error { // with this we can manage all boiler plate code in one place. type priorityLevelConfigurationWrapper struct { client flowcontrolclient.PriorityLevelConfigurationInterface + lister flowcontrollisters.PriorityLevelConfigurationLister } func (fs *priorityLevelConfigurationWrapper) TypeName() string { @@ -156,7 +161,7 @@ func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runt } func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) { - return fs.client.Get(context.TODO(), name, metav1.GetOptions{}) + return fs.lister.Get(name) } func (fs *priorityLevelConfigurationWrapper) Delete(name string) error { diff --git a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go index 90269134a74..a0da43a8ebd 100644 --- a/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go +++ b/pkg/registry/flowcontrol/ensurer/prioritylevelconfiguration_test.go @@ -27,43 +27,41 @@ import ( "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/client-go/kubernetes/fake" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2" + flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2" + "k8s.io/client-go/tools/cache" flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2" "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" ) func TestEnsurePriorityLevel(t *testing.T) { tests := []struct { name string - strategy func(flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer + strategy func(flowcontrolclient.PriorityLevelConfigurationInterface, flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer current *flowcontrolv1beta2.PriorityLevelConfiguration bootstrap *flowcontrolv1beta2.PriorityLevelConfiguration expected *flowcontrolv1beta2.PriorityLevelConfiguration }{ // for suggested configurations { - name: "suggested priority level configuration does not exist - the object should always be re-created", - strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { - return NewSuggestedPriorityLevelEnsurerEnsurer(client) - }, + name: "suggested priority level configuration does not exist - the object should always be re-created", + strategy: NewSuggestedPriorityLevelEnsurerEnsurer, bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), current: nil, expected: newPLConfiguration("pl1").WithLimited(10).Object(), }, { - name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", - strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { - return NewSuggestedPriorityLevelEnsurerEnsurer(client) - }, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), - current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), - expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), + name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", + strategy: NewSuggestedPriorityLevelEnsurerEnsurer, + bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), + current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), + expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), }, { - name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", - strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { - return NewSuggestedPriorityLevelEnsurerEnsurer(client) - }, + name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", + strategy: NewSuggestedPriorityLevelEnsurerEnsurer, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), @@ -71,28 +69,22 @@ func TestEnsurePriorityLevel(t *testing.T) { // for mandatory configurations { - name: "mandatory priority level configuration does not exist - new object should be created", - strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { - return NewMandatoryPriorityLevelEnsurer(client) - }, + name: "mandatory priority level configuration does not exist - new object should be created", + strategy: NewMandatoryPriorityLevelEnsurer, bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), current: nil, expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), }, { - name: "mandatory priority level configuration exists, annotation is missing - annotation is added", - strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { - return NewMandatoryPriorityLevelEnsurer(client) - }, + name: "mandatory priority level configuration exists, annotation is missing - annotation is added", + strategy: NewMandatoryPriorityLevelEnsurer, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), }, { - name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated", - strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { - return NewMandatoryPriorityLevelEnsurer(client) - }, + name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated", + strategy: NewMandatoryPriorityLevelEnsurer, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), @@ -101,13 +93,14 @@ func TestEnsurePriorityLevel(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset().FlowcontrolV1beta2().PriorityLevelConfigurations() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if test.current != nil { client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + indexer.Add(test.current) } - ensurer := test.strategy(client) + ensurer := test.strategy(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) err := ensurer.Ensure([]*flowcontrolv1beta2.PriorityLevelConfiguration{test.bootstrap}) if err != nil { @@ -338,11 +331,13 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { client := fake.NewSimpleClientset().FlowcontrolV1beta2().PriorityLevelConfigurations() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) if test.current != nil { client.Create(context.TODO(), test.current, metav1.CreateOptions{}) + indexer.Add(test.current) } - remover := NewPriorityLevelRemover(client) + remover := NewPriorityLevelRemover(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) err := remover.Remove([]string{test.bootstrapName}) if err != nil { t.Fatalf("Expected no error, but got: %v", err) @@ -425,17 +420,20 @@ func TestGetPriorityLevelRemoveCandidate(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - client := fake.NewSimpleClientset().FlowcontrolV1beta2().PriorityLevelConfigurations() + indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) for i := range test.current { - client.Create(context.TODO(), test.current[i], metav1.CreateOptions{}) + indexer.Add(test.current[i]) } - removeListGot, err := GetPriorityLevelRemoveCandidate(client, test.bootstrap) + lister := flowcontrollisters.NewPriorityLevelConfigurationLister(indexer) + removeListGot, err := GetPriorityLevelRemoveCandidate(lister, test.bootstrap) if err != nil { t.Fatalf("Expected no error, but got: %v", err) } - if !cmp.Equal(test.expected, removeListGot) { + if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool { + return a < b + })) { t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot)) } }) diff --git a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go index 595400baac1..12077700f58 100644 --- a/pkg/registry/flowcontrol/rest/storage_flowcontrol.go +++ b/pkg/registry/flowcontrol/rest/storage_flowcontrol.go @@ -27,7 +27,10 @@ import ( "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" serverstorage "k8s.io/apiserver/pkg/server/storage" + "k8s.io/client-go/informers" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2" + flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2" + "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/api/legacyscheme" "k8s.io/kubernetes/pkg/apis/flowcontrol" @@ -42,7 +45,9 @@ import ( var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{} // RESTStorageProvider is a provider of REST storage -type RESTStorageProvider struct{} +type RESTStorageProvider struct { + InformerFactory informers.SharedInformerFactory +} // PostStartHookName is the name of the post-start-hook provided by flow-control storage const PostStartHookName = "priority-and-fairness-config-producer" @@ -107,10 +112,24 @@ func (p RESTStorageProvider) GroupName() string { // PostStartHook returns the hook func that launches the config provider func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) { - return PostStartHookName, ensureAPFBootstrapConfiguration, nil + bce := &bootstrapConfigurationEnsurer{ + informersSynced: []cache.InformerSynced{ + p.InformerFactory.Flowcontrol().V1beta2().PriorityLevelConfigurations().Informer().HasSynced, + p.InformerFactory.Flowcontrol().V1beta2().FlowSchemas().Informer().HasSynced, + }, + fsLister: p.InformerFactory.Flowcontrol().V1beta2().FlowSchemas().Lister(), + plcLister: p.InformerFactory.Flowcontrol().V1beta2().PriorityLevelConfigurations().Lister(), + } + return PostStartHookName, bce.ensureAPFBootstrapConfiguration, nil } -func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error { +type bootstrapConfigurationEnsurer struct { + informersSynced []cache.InformerSynced + fsLister flowcontrollisters.FlowSchemaLister + plcLister flowcontrollisters.PriorityLevelConfigurationLister +} + +func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error { clientset, err := flowcontrolclient.NewForConfig(hookContext.LoopbackClientConfig) if err != nil { return fmt.Errorf("failed to initialize clientset for APF - %w", err) @@ -121,11 +140,15 @@ func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookC ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) defer cancel() + if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) { + return fmt.Errorf("APF bootstrap ensurer timed out waiting for cache sync") + } + err = wait.PollImmediateUntilWithContext( ctx, time.Second, func(context.Context) (bool, error) { - if err := ensure(clientset); err != nil { + if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") return false, nil } @@ -141,7 +164,7 @@ func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookC wait.PollImmediateUntil( time.Minute, func() (bool, error) { - if err := ensure(clientset); err != nil { + if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") } // always auto update both suggested and mandatory configuration @@ -153,56 +176,56 @@ func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookC return nil } -func ensure(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error { - if err := ensureSuggestedConfiguration(clientset); err != nil { +func ensure(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + if err := ensureSuggestedConfiguration(clientset, fsLister, plcLister); err != nil { // We should not attempt creation of mandatory objects if ensuring the suggested // configuration resulted in an error. // This only happens when the stop channel is closed. return fmt.Errorf("failed ensuring suggested settings - %w", err) } - if err := ensureMandatoryConfiguration(clientset); err != nil { + if err := ensureMandatoryConfiguration(clientset, fsLister, plcLister); err != nil { return fmt.Errorf("failed ensuring mandatory settings - %w", err) } - if err := removeConfiguration(clientset); err != nil { + if err := removeConfiguration(clientset, fsLister, plcLister); err != nil { return fmt.Errorf("failed to delete removed settings - %w", err) } return nil } -func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error { - fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas()) +func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) if err := fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas); err != nil { return err } - plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations()) + plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), plcLister) return plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations) } -func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error { - fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas()) +func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil { return err } - plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations()) + plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations(), plcLister) return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations) } -func removeConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error { - if err := removeFlowSchema(clientset.FlowSchemas()); err != nil { +func removeConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { + if err := removeFlowSchema(clientset.FlowSchemas(), fsLister); err != nil { return err } - return removePriorityLevel(clientset.PriorityLevelConfigurations()) + return removePriorityLevel(clientset.PriorityLevelConfigurations(), plcLister) } -func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface) error { +func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) error { bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...) - candidates, err := ensurer.GetFlowSchemaRemoveCandidate(client, bootstrap) + candidates, err := ensurer.GetFlowSchemaRemoveCandidate(lister, bootstrap) if err != nil { return err } @@ -210,13 +233,13 @@ func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface) error { return nil } - fsRemover := ensurer.NewFlowSchemaRemover(client) + fsRemover := ensurer.NewFlowSchemaRemover(client, lister) return fsRemover.Remove(candidates) } -func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface) error { +func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) error { bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...) - candidates, err := ensurer.GetPriorityLevelRemoveCandidate(client, bootstrap) + candidates, err := ensurer.GetPriorityLevelRemoveCandidate(lister, bootstrap) if err != nil { return err } @@ -224,7 +247,7 @@ func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInte return nil } - plRemover := ensurer.NewPriorityLevelRemover(client) + plRemover := ensurer.NewPriorityLevelRemover(client, lister) return plRemover.Remove(candidates) }