Merge pull request #105800 from benluddy/apf-bootstrap-configuration-lister

Use a lister for bootstrap flowcontrol config objects.
This commit is contained in:
Kubernetes Prow Robot 2022-01-04 07:30:52 -08:00 committed by GitHub
commit 7dcad28ab0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 139 additions and 110 deletions

View File

@ -434,7 +434,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
rbacrest.RESTStorageProvider{Authorizer: c.GenericConfig.Authorization.Authorizer},
schedulingrest.RESTStorageProvider{},
storagerest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{},
flowcontrolrest.RESTStorageProvider{InformerFactory: c.GenericConfig.SharedInformerFactory},
// keep apps after extensions so legacy clients resolve the extensions versions of shared resource names.
// See https://github.com/kubernetes/kubernetes/issues/42392
appsrest.StorageProvider{},

View File

@ -24,9 +24,11 @@ import (
flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2"
flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
)
@ -46,9 +48,10 @@ type FlowSchemaRemover interface {
// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of suggested FlowSchema configuration objects.
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
lister: lister,
}
return &fsEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
@ -58,9 +61,10 @@ func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface)
// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of mandatory FlowSchema configuration objects.
func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
lister: lister,
}
return &fsEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
@ -70,10 +74,11 @@ func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface)
// NewFlowSchemaRemover returns a FlowSchemaRemover instance that
// can be used to remove a set of FlowSchema configuration objects.
func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface) FlowSchemaRemover {
func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaRemover {
return &fsEnsurer{
wrapper: &flowSchemaWrapper{
client: client,
lister: lister,
},
}
}
@ -82,9 +87,8 @@ func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface) FlowSche
// names that are candidates for deletion from the cluster.
// bootstrap: a set of hard coded FlowSchema configuration objects
// kube-apiserver maintains in-memory.
func GetFlowSchemaRemoveCandidate(client flowcontrolclient.FlowSchemaInterface, bootstrap []*flowcontrolv1beta2.FlowSchema) ([]string, error) {
// TODO(101667): Use a lister here to avoid periodic LIST calls
fsList, err := client.List(context.TODO(), metav1.ListOptions{})
func GetFlowSchemaRemoveCandidate(lister flowcontrollisters.FlowSchemaLister, bootstrap []*flowcontrolv1beta2.FlowSchema) ([]string, error) {
fsList, err := lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list FlowSchema - %w", err)
}
@ -94,9 +98,9 @@ func GetFlowSchemaRemoveCandidate(client flowcontrolclient.FlowSchemaInterface,
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(fsList.Items))
for i := range fsList.Items {
currentObjects[i] = &fsList.Items[i]
currentObjects := make([]metav1.Object, len(fsList))
for i := range fsList {
currentObjects[i] = fsList[i]
}
return getRemoveCandidate(bootstrapNames, currentObjects), nil
@ -131,6 +135,7 @@ func (e *fsEnsurer) Remove(flowSchemas []string) error {
// we can manage all boiler plate code in one place.
type flowSchemaWrapper struct {
client flowcontrolclient.FlowSchemaInterface
lister flowcontrollisters.FlowSchemaLister
}
func (fs *flowSchemaWrapper) TypeName() string {
@ -156,7 +161,7 @@ func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, erro
}
func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) {
return fs.client.Get(context.TODO(), name, metav1.GetOptions{})
return fs.lister.Get(name)
}
func (fs *flowSchemaWrapper) Delete(name string) error {

View File

@ -27,44 +27,41 @@ import (
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2"
"k8s.io/client-go/tools/cache"
flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
)
func TestEnsureFlowSchema(t *testing.T) {
tests := []struct {
name string
strategy func(flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer
strategy func(flowcontrolclient.FlowSchemaInterface, flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer
current *flowcontrolv1beta2.FlowSchema
bootstrap *flowcontrolv1beta2.FlowSchema
expected *flowcontrolv1beta2.FlowSchema
}{
// for suggested configurations
{
name: "suggested flow schema does not exist - the object should always be re-created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client)
},
name: "suggested flow schema does not exist - the object should always be re-created",
strategy: NewSuggestedFlowSchemaEnsurer,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil,
expected: newFlowSchema("fs1", "pl1", 100).Object(),
},
{
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client)
},
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedFlowSchemaEnsurer,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client)
},
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedFlowSchemaEnsurer,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
@ -72,28 +69,22 @@ func TestEnsureFlowSchema(t *testing.T) {
// for mandatory configurations
{
name: "mandatory flow schema does not exist - new object should be created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewMandatoryFlowSchemaEnsurer(client)
},
name: "mandatory flow schema does not exist - new object should be created",
strategy: NewMandatoryFlowSchemaEnsurer,
bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
current: nil,
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory flow schema exists, annotation is missing - annotation should be added",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewMandatoryFlowSchemaEnsurer(client)
},
name: "mandatory flow schema exists, annotation is missing - annotation should be added",
strategy: NewMandatoryFlowSchemaEnsurer,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 100).Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewMandatoryFlowSchemaEnsurer(client)
},
name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryFlowSchemaEnsurer,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
@ -103,11 +94,13 @@ func TestEnsureFlowSchema(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta2().FlowSchemas()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
ensurer := test.strategy(client)
ensurer := test.strategy(client, flowcontrollisters.NewFlowSchemaLister(indexer))
err := ensurer.Ensure([]*flowcontrolv1beta2.FlowSchema{test.bootstrap})
if err != nil {
@ -322,11 +315,13 @@ func TestRemoveFlowSchema(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta2().FlowSchemas()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
remover := NewFlowSchemaRemover(client)
remover := NewFlowSchemaRemover(client, flowcontrollisters.NewFlowSchemaLister(indexer))
err := remover.Remove([]string{test.bootstrapName})
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
@ -409,17 +404,20 @@ func TestGetFlowSchemaRemoveCandidate(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta2().FlowSchemas()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for i := range test.current {
client.Create(context.TODO(), test.current[i], metav1.CreateOptions{})
indexer.Add(test.current[i])
}
removeListGot, err := GetFlowSchemaRemoveCandidate(client, test.bootstrap)
lister := flowcontrollisters.NewFlowSchemaLister(indexer)
removeListGot, err := GetFlowSchemaRemoveCandidate(lister, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot) {
if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool {
return a < b
})) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})

View File

@ -24,9 +24,11 @@ import (
flowcontrolv1beta2 "k8s.io/api/flowcontrol/v1beta2"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2"
flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
)
@ -46,9 +48,10 @@ type PriorityLevelRemover interface {
// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects.
func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
}
return &plEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
@ -58,9 +61,10 @@ func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLe
// NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects.
func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
}
return &plEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
@ -70,10 +74,11 @@ func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConf
// NewPriorityLevelRemover returns a PriorityLevelRemover instance that
// can be used to remove a set of PriorityLevelConfiguration configuration objects.
func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelRemover {
func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelRemover {
return &plEnsurer{
wrapper: &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
},
}
}
@ -82,9 +87,8 @@ func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfiguration
// names that are candidates for removal from the cluster.
// bootstrap: a set of hard coded PriorityLevelConfiguration configuration
// objects kube-apiserver maintains in-memory.
func GetPriorityLevelRemoveCandidate(client flowcontrolclient.PriorityLevelConfigurationInterface, bootstrap []*flowcontrolv1beta2.PriorityLevelConfiguration) ([]string, error) {
// TODO(101667): Use a lister here to avoid periodic LIST calls
plList, err := client.List(context.TODO(), metav1.ListOptions{})
func GetPriorityLevelRemoveCandidate(lister flowcontrollisters.PriorityLevelConfigurationLister, bootstrap []*flowcontrolv1beta2.PriorityLevelConfiguration) ([]string, error) {
plList, err := lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err)
}
@ -94,9 +98,9 @@ func GetPriorityLevelRemoveCandidate(client flowcontrolclient.PriorityLevelConfi
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(plList.Items))
for i := range plList.Items {
currentObjects[i] = &plList.Items[i]
currentObjects := make([]metav1.Object, len(plList))
for i := range plList {
currentObjects[i] = plList[i]
}
return getRemoveCandidate(bootstrapNames, currentObjects), nil
@ -131,6 +135,7 @@ func (e *plEnsurer) Remove(priorityLevels []string) error {
// with this we can manage all boiler plate code in one place.
type priorityLevelConfigurationWrapper struct {
client flowcontrolclient.PriorityLevelConfigurationInterface
lister flowcontrollisters.PriorityLevelConfigurationLister
}
func (fs *priorityLevelConfigurationWrapper) TypeName() string {
@ -156,7 +161,7 @@ func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runt
}
func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) {
return fs.client.Get(context.TODO(), name, metav1.GetOptions{})
return fs.lister.Get(name)
}
func (fs *priorityLevelConfigurationWrapper) Delete(name string) error {

View File

@ -27,43 +27,41 @@ import (
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2"
"k8s.io/client-go/tools/cache"
flowcontrolapisv1beta2 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta2"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
)
func TestEnsurePriorityLevel(t *testing.T) {
tests := []struct {
name string
strategy func(flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer
strategy func(flowcontrolclient.PriorityLevelConfigurationInterface, flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer
current *flowcontrolv1beta2.PriorityLevelConfiguration
bootstrap *flowcontrolv1beta2.PriorityLevelConfiguration
expected *flowcontrolv1beta2.PriorityLevelConfiguration
}{
// for suggested configurations
{
name: "suggested priority level configuration does not exist - the object should always be re-created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client)
},
name: "suggested priority level configuration does not exist - the object should always be re-created",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer,
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).Object(),
},
{
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client)
}, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
{
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client)
},
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
@ -71,28 +69,22 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for mandatory configurations
{
name: "mandatory priority level configuration does not exist - new object should be created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewMandatoryPriorityLevelEnsurer(client)
},
name: "mandatory priority level configuration does not exist - new object should be created",
strategy: NewMandatoryPriorityLevelEnsurer,
bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory priority level configuration exists, annotation is missing - annotation is added",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewMandatoryPriorityLevelEnsurer(client)
},
name: "mandatory priority level configuration exists, annotation is missing - annotation is added",
strategy: NewMandatoryPriorityLevelEnsurer,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithLimited(20).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
{
name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewMandatoryPriorityLevelEnsurer(client)
},
name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryPriorityLevelEnsurer,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
@ -101,13 +93,14 @@ func TestEnsurePriorityLevel(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta2().PriorityLevelConfigurations()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
ensurer := test.strategy(client)
ensurer := test.strategy(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer))
err := ensurer.Ensure([]*flowcontrolv1beta2.PriorityLevelConfiguration{test.bootstrap})
if err != nil {
@ -338,11 +331,13 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta2().PriorityLevelConfigurations()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
remover := NewPriorityLevelRemover(client)
remover := NewPriorityLevelRemover(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer))
err := remover.Remove([]string{test.bootstrapName})
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
@ -425,17 +420,20 @@ func TestGetPriorityLevelRemoveCandidate(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta2().PriorityLevelConfigurations()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for i := range test.current {
client.Create(context.TODO(), test.current[i], metav1.CreateOptions{})
indexer.Add(test.current[i])
}
removeListGot, err := GetPriorityLevelRemoveCandidate(client, test.bootstrap)
lister := flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)
removeListGot, err := GetPriorityLevelRemoveCandidate(lister, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot) {
if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool {
return a < b
})) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})

View File

@ -27,7 +27,10 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
genericapiserver "k8s.io/apiserver/pkg/server"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/client-go/informers"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta2"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta2"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/api/legacyscheme"
"k8s.io/kubernetes/pkg/apis/flowcontrol"
@ -42,7 +45,9 @@ import (
var _ genericapiserver.PostStartHookProvider = RESTStorageProvider{}
// RESTStorageProvider is a provider of REST storage
type RESTStorageProvider struct{}
type RESTStorageProvider struct {
InformerFactory informers.SharedInformerFactory
}
// PostStartHookName is the name of the post-start-hook provided by flow-control storage
const PostStartHookName = "priority-and-fairness-config-producer"
@ -107,10 +112,24 @@ func (p RESTStorageProvider) GroupName() string {
// PostStartHook returns the hook func that launches the config provider
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return PostStartHookName, ensureAPFBootstrapConfiguration, nil
bce := &bootstrapConfigurationEnsurer{
informersSynced: []cache.InformerSynced{
p.InformerFactory.Flowcontrol().V1beta2().PriorityLevelConfigurations().Informer().HasSynced,
p.InformerFactory.Flowcontrol().V1beta2().FlowSchemas().Informer().HasSynced,
},
fsLister: p.InformerFactory.Flowcontrol().V1beta2().FlowSchemas().Lister(),
plcLister: p.InformerFactory.Flowcontrol().V1beta2().PriorityLevelConfigurations().Lister(),
}
return PostStartHookName, bce.ensureAPFBootstrapConfiguration, nil
}
func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error {
type bootstrapConfigurationEnsurer struct {
informersSynced []cache.InformerSynced
fsLister flowcontrollisters.FlowSchemaLister
plcLister flowcontrollisters.PriorityLevelConfigurationLister
}
func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error {
clientset, err := flowcontrolclient.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize clientset for APF - %w", err)
@ -121,11 +140,15 @@ func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookC
ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
defer cancel()
if !cache.WaitForCacheSync(ctx.Done(), bce.informersSynced...) {
return fmt.Errorf("APF bootstrap ensurer timed out waiting for cache sync")
}
err = wait.PollImmediateUntilWithContext(
ctx,
time.Second,
func(context.Context) (bool, error) {
if err := ensure(clientset); err != nil {
if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
return false, nil
}
@ -141,7 +164,7 @@ func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookC
wait.PollImmediateUntil(
time.Minute,
func() (bool, error) {
if err := ensure(clientset); err != nil {
if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
}
// always auto update both suggested and mandatory configuration
@ -153,56 +176,56 @@ func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookC
return nil
}
func ensure(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error {
if err := ensureSuggestedConfiguration(clientset); err != nil {
func ensure(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
if err := ensureSuggestedConfiguration(clientset, fsLister, plcLister); err != nil {
// We should not attempt creation of mandatory objects if ensuring the suggested
// configuration resulted in an error.
// This only happens when the stop channel is closed.
return fmt.Errorf("failed ensuring suggested settings - %w", err)
}
if err := ensureMandatoryConfiguration(clientset); err != nil {
if err := ensureMandatoryConfiguration(clientset, fsLister, plcLister); err != nil {
return fmt.Errorf("failed ensuring mandatory settings - %w", err)
}
if err := removeConfiguration(clientset); err != nil {
if err := removeConfiguration(clientset, fsLister, plcLister); err != nil {
return fmt.Errorf("failed to delete removed settings - %w", err)
}
return nil
}
func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error {
fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas())
func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister)
if err := fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas); err != nil {
return err
}
plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations())
plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), plcLister)
return plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
}
func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error {
fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas())
func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister)
if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil {
return err
}
plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations())
plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations(), plcLister)
return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations)
}
func removeConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface) error {
if err := removeFlowSchema(clientset.FlowSchemas()); err != nil {
func removeConfiguration(clientset flowcontrolclient.FlowcontrolV1beta2Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
if err := removeFlowSchema(clientset.FlowSchemas(), fsLister); err != nil {
return err
}
return removePriorityLevel(clientset.PriorityLevelConfigurations())
return removePriorityLevel(clientset.PriorityLevelConfigurations(), plcLister)
}
func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface) error {
func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
candidates, err := ensurer.GetFlowSchemaRemoveCandidate(client, bootstrap)
candidates, err := ensurer.GetFlowSchemaRemoveCandidate(lister, bootstrap)
if err != nil {
return err
}
@ -210,13 +233,13 @@ func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface) error {
return nil
}
fsRemover := ensurer.NewFlowSchemaRemover(client)
fsRemover := ensurer.NewFlowSchemaRemover(client, lister)
return fsRemover.Remove(candidates)
}
func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface) error {
func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
candidates, err := ensurer.GetPriorityLevelRemoveCandidate(client, bootstrap)
candidates, err := ensurer.GetPriorityLevelRemoveCandidate(lister, bootstrap)
if err != nil {
return err
}
@ -224,7 +247,7 @@ func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInte
return nil
}
plRemover := ensurer.NewPriorityLevelRemover(client)
plRemover := ensurer.NewPriorityLevelRemover(client, lister)
return plRemover.Remove(candidates)
}