Merge pull request #111422 from MikeSpreitzer/objectify-apf-maintenance

Improve and simplify maintenance of APF bootstrap objects without type assertions
This commit is contained in:
Kubernetes Prow Robot 2023-05-05 09:17:13 -07:00 committed by GitHub
commit 929097ae64
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 510 additions and 714 deletions

View File

@ -18,190 +18,128 @@ package ensurer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
) )
var ( // WrapBootstrapFlowSchemas creates a generic representation of the given bootstrap objects bound with their operations
errObjectNotFlowSchema = errors.New("object is not a FlowSchema type") // Every object in `boots` is immutable.
) func WrapBootstrapFlowSchemas(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister, boots []*flowcontrolv1beta3.FlowSchema) BootstrapObjects {
return &bootstrapFlowSchemas{
// FlowSchemaEnsurer ensures the specified bootstrap configuration objects flowSchemaClient: flowSchemaClient{
type FlowSchemaEnsurer interface {
Ensure([]*flowcontrolv1beta3.FlowSchema) error
}
// FlowSchemaRemover is the interface that wraps the
// RemoveAutoUpdateEnabledObjects method.
//
// RemoveAutoUpdateEnabledObjects removes a set of bootstrap FlowSchema
// objects specified via their names. The function removes an object
// only if automatic update of the spec is enabled for it.
type FlowSchemaRemover interface {
RemoveAutoUpdateEnabledObjects([]string) error
}
// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of suggested FlowSchema configuration objects.
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client, client: client,
lister: lister, lister: lister},
} boots: boots,
return &fsEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper,
} }
} }
// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that type flowSchemaClient struct {
// can be used to ensure a set of mandatory FlowSchema configuration objects.
func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
lister: lister,
}
return &fsEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
}
// NewFlowSchemaRemover returns a FlowSchemaRemover instance that
// can be used to remove a set of FlowSchema configuration objects.
func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaRemover {
return &fsEnsurer{
wrapper: &flowSchemaWrapper{
client: client,
lister: lister,
},
}
}
// GetFlowSchemaRemoveCandidates returns a list of FlowSchema object
// names that are candidates for deletion from the cluster.
// bootstrap: a set of hard coded FlowSchema configuration objects
// kube-apiserver maintains in-memory.
func GetFlowSchemaRemoveCandidates(lister flowcontrollisters.FlowSchemaLister, bootstrap []*flowcontrolv1beta3.FlowSchema) ([]string, error) {
fsList, err := lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list FlowSchema - %w", err)
}
bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(fsList))
for i := range fsList {
currentObjects[i] = fsList[i]
}
return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil
}
type fsEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
}
func (e *fsEnsurer) Ensure(flowSchemas []*flowcontrolv1beta3.FlowSchema) error {
for _, flowSchema := range flowSchemas {
// This code gets called by different goroutines. To avoid race conditions when
// https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243
// temporarily modifies the TypeMeta, we have to make a copy here.
if err := ensureConfiguration(e.wrapper, e.strategy, flowSchema.DeepCopy()); err != nil {
return err
}
}
return nil
}
func (e *fsEnsurer) RemoveAutoUpdateEnabledObjects(flowSchemas []string) error {
for _, flowSchema := range flowSchemas {
if err := removeAutoUpdateEnabledConfiguration(e.wrapper, flowSchema); err != nil {
return err
}
}
return nil
}
// flowSchemaWrapper abstracts all FlowSchema specific logic, with this
// we can manage all boiler plate code in one place.
type flowSchemaWrapper struct {
client flowcontrolclient.FlowSchemaInterface client flowcontrolclient.FlowSchemaInterface
lister flowcontrollisters.FlowSchemaLister lister flowcontrollisters.FlowSchemaLister
} }
func (fs *flowSchemaWrapper) TypeName() string { type bootstrapFlowSchemas struct {
flowSchemaClient
// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.FlowSchema
}
func (*flowSchemaClient) typeName() string {
return "FlowSchema" return "FlowSchema"
} }
func (fs *flowSchemaWrapper) Create(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapFlowSchemas) len() int {
fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema) return len(boots.boots)
if !ok {
return nil, errObjectNotFlowSchema
} }
return fs.client.Create(context.TODO(), fsObject, metav1.CreateOptions{FieldManager: fieldManager}) func (boots *bootstrapFlowSchemas) get(i int) bootstrapObject {
return &bootstrapFlowSchema{
flowSchemaClient: &boots.flowSchemaClient,
bootstrap: boots.boots[i],
}
} }
func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapFlowSchemas) getExistingObjects() ([]deletable, error) {
fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema) objs, err := boots.lister.List(labels.Everything())
if !ok { if err != nil {
return nil, errObjectNotFlowSchema return nil, fmt.Errorf("failed to list FlowSchema objects - %w", err)
}
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletableFlowSchema{
FlowSchema: obj,
client: boots.client,
}
}
return dels, nil
} }
return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) type bootstrapFlowSchema struct {
*flowSchemaClient
// points to immutable contnet
bootstrap *flowcontrolv1beta3.FlowSchema
} }
func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) { func (boot *bootstrapFlowSchema) getName() string {
return fs.lister.Get(name) return boot.bootstrap.Name
} }
func (fs *flowSchemaWrapper) Delete(name string) error { func (boot *bootstrapFlowSchema) create(ctx context.Context) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) // Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
} }
func (fs *flowSchemaWrapper) CopySpec(bootstrap, current runtime.Object) error { func (boot *bootstrapFlowSchema) getCurrent() (wantAndHave, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema) current, err := boot.lister.Get(boot.bootstrap.Name)
if !ok { if err != nil {
return errObjectNotFlowSchema return nil, err
} }
currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema) return &wantAndHaveFlowSchema{
if !ok { client: boot.client,
return errObjectNotFlowSchema want: boot.bootstrap,
have: current,
}, nil
} }
specCopy := bootstrapFS.Spec.DeepCopy() type wantAndHaveFlowSchema struct {
currentFS.Spec = *specCopy client flowcontrolclient.FlowSchemaInterface
return nil want *flowcontrolv1beta3.FlowSchema
have *flowcontrolv1beta3.FlowSchema
} }
func (fs *flowSchemaWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { func (wah *wantAndHaveFlowSchema) getWant() configurationObject {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema) return wah.want
if !ok {
return false, errObjectNotFlowSchema
}
currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return false, errObjectNotFlowSchema
} }
return flowSchemaSpecChanged(bootstrapFS, currentFS), nil func (wah *wantAndHaveFlowSchema) getHave() configurationObject {
return wah.have
}
func (wah *wantAndHaveFlowSchema) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatableFlowSchema{
FlowSchema: copy,
client: wah.client,
}
}
func (wah *wantAndHaveFlowSchema) specsDiffer() bool {
return flowSchemaSpecChanged(wah.want, wah.have)
} }
func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool { func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool {
@ -209,3 +147,23 @@ func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool
flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema) flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec) return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec)
} }
type updatableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}
func (u *updatableFlowSchema) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.FlowSchema, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}
type deletableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}
func (dbl *deletableFlowSchema) delete(ctx context.Context /* TODO: resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
}

View File

@ -26,20 +26,23 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func init() {
klog.InitFlags(nil)
}
func TestEnsureFlowSchema(t *testing.T) { func TestEnsureFlowSchema(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
strategy func(flowcontrolclient.FlowSchemaInterface, flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer strategy func() EnsureStrategy
current *flowcontrolv1beta3.FlowSchema current *flowcontrolv1beta3.FlowSchema
bootstrap *flowcontrolv1beta3.FlowSchema bootstrap *flowcontrolv1beta3.FlowSchema
expected *flowcontrolv1beta3.FlowSchema expected *flowcontrolv1beta3.FlowSchema
@ -47,21 +50,21 @@ func TestEnsureFlowSchema(t *testing.T) {
// for suggested configurations // for suggested configurations
{ {
name: "suggested flow schema does not exist - the object should always be re-created", name: "suggested flow schema does not exist - the object should always be re-created",
strategy: NewSuggestedFlowSchemaEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil, current: nil,
expected: newFlowSchema("fs1", "pl1", 100).Object(), expected: newFlowSchema("fs1", "pl1", 100).Object(),
}, },
{ {
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedFlowSchemaEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedFlowSchemaEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
@ -70,21 +73,21 @@ func TestEnsureFlowSchema(t *testing.T) {
// for mandatory configurations // for mandatory configurations
{ {
name: "mandatory flow schema does not exist - new object should be created", name: "mandatory flow schema does not exist - new object should be created",
strategy: NewMandatoryFlowSchemaEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
current: nil, current: nil,
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "mandatory flow schema exists, annotation is missing - annotation should be added", name: "mandatory flow schema exists, annotation is missing - annotation should be added",
strategy: NewMandatoryFlowSchemaEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 100).Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated", name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryFlowSchemaEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
@ -100,9 +103,9 @@ func TestEnsureFlowSchema(t *testing.T) {
indexer.Add(test.current) indexer.Add(test.current)
} }
ensurer := test.strategy(client, flowcontrollisters.NewFlowSchemaLister(indexer)) boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{test.bootstrap})
strategy := test.strategy()
err := ensurer.Ensure([]*flowcontrolv1beta3.FlowSchema{test.bootstrap}) err := EnsureConfigurations(context.Background(), boots, strategy)
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -207,12 +210,19 @@ func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{}) wah := &wantAndHaveFlowSchema{
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) want: test.bootstrap,
have: test.current,
}
strategy := NewSuggestedEnsureStrategy()
updatableGot, updateGot, err := strategy.ShouldUpdate(wah)
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error, but got: %v", err)
} }
var newObjectGot *flowcontrolv1beta3.FlowSchema
if updatableGot != nil {
newObjectGot = updatableGot.(*updatableFlowSchema).FlowSchema
}
if test.newObjectExpected == nil { if test.newObjectExpected == nil {
if newObjectGot != nil { if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot) t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
@ -288,24 +298,42 @@ func TestRemoveFlowSchema(t *testing.T) {
removeExpected bool removeExpected bool
}{ }{
{ {
name: "flow schema does not exist", name: "no flow schema objects exist",
bootstrapName: "fs1", bootstrapName: "fs1",
current: nil, current: nil,
}, },
{ {
name: "flow schema exists, auto update is enabled", name: "flow schema unwanted, auto update is enabled",
bootstrapName: "fs1", bootstrapName: "fs0",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
removeExpected: true, removeExpected: true,
}, },
{ {
name: "flow schema exists, auto update is disabled", name: "flow schema unwanted, auto update is disabled",
bootstrapName: "fs0",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
removeExpected: false,
},
{
name: "flow schema unwanted, the auto-update annotation is malformed",
bootstrapName: "fs0",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false,
},
{
name: "flow schema wanted, auto update is enabled",
bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
removeExpected: false,
},
{
name: "flow schema wanted, auto update is disabled",
bootstrapName: "fs1", bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
removeExpected: false, removeExpected: false,
}, },
{ {
name: "flow schema exists, the auto-update annotation is malformed", name: "flow schema wanted, the auto-update annotation is malformed",
bootstrapName: "fs1", bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false, removeExpected: false,
@ -320,9 +348,10 @@ func TestRemoveFlowSchema(t *testing.T) {
client.Create(context.TODO(), test.current, metav1.CreateOptions{}) client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current) indexer.Add(test.current)
} }
bootFS := newFlowSchema(test.bootstrapName, "pl", 100).Object()
boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{bootFS})
err := RemoveUnwantedObjects(context.Background(), boots)
remover := NewFlowSchemaRemover(client, flowcontrollisters.NewFlowSchemaLister(indexer))
err := remover.RemoveAutoUpdateEnabledObjects([]string{test.bootstrapName})
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -330,100 +359,21 @@ func TestRemoveFlowSchema(t *testing.T) {
if test.current == nil { if test.current == nil {
return return
} }
_, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) _, err = client.Get(context.TODO(), test.current.Name, metav1.GetOptions{})
switch { switch {
case test.removeExpected: case test.removeExpected:
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err) t.Errorf("Expected error from Get after Delete: %q, but got: %v", metav1.StatusReasonNotFound, err)
} }
default: default:
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error from Get after Delete, but got: %v", err)
} }
} }
}) })
} }
} }
func TestGetFlowSchemaRemoveCandidate(t *testing.T) {
tests := []struct {
name string
current []*flowcontrolv1beta3.FlowSchema
bootstrap []*flowcontrolv1beta3.FlowSchema
expected []string
}{
{
name: "no object has been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{},
},
{
name: "bootstrap is empty, all current objects with the annotation should be candidates",
bootstrap: []*flowcontrolv1beta3.FlowSchema{},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).Object(),
},
expected: []string{"fs1", "fs2"},
},
{
name: "object(s) have been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{"fs2", "fs3"},
},
{
name: "object(s) without the annotation key are ignored",
bootstrap: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).Object(),
newFlowSchema("fs3", "pl3", 300).Object(),
},
expected: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for i := range test.current {
indexer.Add(test.current[i])
}
lister := flowcontrollisters.NewFlowSchemaLister(indexer)
removeListGot, err := GetFlowSchemaRemoveCandidates(lister, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool {
return a < b
})) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})
}
}
type fsBuilder struct { type fsBuilder struct {
object *flowcontrolv1beta3.FlowSchema object *flowcontrolv1beta3.FlowSchema
} }

View File

@ -18,191 +18,128 @@ package ensurer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
) )
var ( // WrapBootstrapPriorityLevelConfigurations creates a generic representation of the given bootstrap objects bound with their operations.
errObjectNotPriorityLevel = errors.New("object is not a PriorityLevelConfiguration type") // Every object in `boots` is immutable.
) func WrapBootstrapPriorityLevelConfigurations(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister, boots []*flowcontrolv1beta3.PriorityLevelConfiguration) BootstrapObjects {
return &bootstrapPriorityLevelConfigurations{
// PriorityLevelEnsurer ensures the specified bootstrap configuration objects priorityLevelConfigurationClient: priorityLevelConfigurationClient{
type PriorityLevelEnsurer interface {
Ensure([]*flowcontrolv1beta3.PriorityLevelConfiguration) error
}
// PriorityLevelRemover is the interface that wraps the
// RemoveAutoUpdateEnabledObjects method.
//
// RemoveAutoUpdateEnabledObjects removes a set of bootstrap
// PriorityLevelConfiguration objects specified via their names.
// The function removes an object only if automatic update
// of the spec is enabled for it.
type PriorityLevelRemover interface {
RemoveAutoUpdateEnabledObjects([]string) error
}
// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects.
func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client, client: client,
lister: lister, lister: lister},
} boots: boots,
return &plEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper,
} }
} }
// NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that type priorityLevelConfigurationClient struct {
// can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects.
func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
}
return &plEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
}
// NewPriorityLevelRemover returns a PriorityLevelRemover instance that
// can be used to remove a set of PriorityLevelConfiguration configuration objects.
func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelRemover {
return &plEnsurer{
wrapper: &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
},
}
}
// GetPriorityLevelRemoveCandidates returns a list of PriorityLevelConfiguration
// names that are candidates for removal from the cluster.
// bootstrap: a set of hard coded PriorityLevelConfiguration configuration
// objects kube-apiserver maintains in-memory.
func GetPriorityLevelRemoveCandidates(lister flowcontrollisters.PriorityLevelConfigurationLister, bootstrap []*flowcontrolv1beta3.PriorityLevelConfiguration) ([]string, error) {
plList, err := lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err)
}
bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(plList))
for i := range plList {
currentObjects[i] = plList[i]
}
return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil
}
type plEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
}
func (e *plEnsurer) Ensure(priorityLevels []*flowcontrolv1beta3.PriorityLevelConfiguration) error {
for _, priorityLevel := range priorityLevels {
// This code gets called by different goroutines. To avoid race conditions when
// https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243
// temporarily modifies the TypeMeta, we have to make a copy here.
if err := ensureConfiguration(e.wrapper, e.strategy, priorityLevel.DeepCopy()); err != nil {
return err
}
}
return nil
}
func (e *plEnsurer) RemoveAutoUpdateEnabledObjects(priorityLevels []string) error {
for _, priorityLevel := range priorityLevels {
if err := removeAutoUpdateEnabledConfiguration(e.wrapper, priorityLevel); err != nil {
return err
}
}
return nil
}
// priorityLevelConfigurationWrapper abstracts all PriorityLevelConfiguration specific logic,
// with this we can manage all boiler plate code in one place.
type priorityLevelConfigurationWrapper struct {
client flowcontrolclient.PriorityLevelConfigurationInterface client flowcontrolclient.PriorityLevelConfigurationInterface
lister flowcontrollisters.PriorityLevelConfigurationLister lister flowcontrollisters.PriorityLevelConfigurationLister
} }
func (fs *priorityLevelConfigurationWrapper) TypeName() string { type bootstrapPriorityLevelConfigurations struct {
priorityLevelConfigurationClient
// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.PriorityLevelConfiguration
}
func (*priorityLevelConfigurationClient) typeName() string {
return "PriorityLevelConfiguration" return "PriorityLevelConfiguration"
} }
func (fs *priorityLevelConfigurationWrapper) Create(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapPriorityLevelConfigurations) len() int {
plObject, ok := object.(*flowcontrolv1beta3.PriorityLevelConfiguration) return len(boots.boots)
if !ok {
return nil, errObjectNotPriorityLevel
} }
return fs.client.Create(context.TODO(), plObject, metav1.CreateOptions{FieldManager: fieldManager}) func (boots *bootstrapPriorityLevelConfigurations) get(i int) bootstrapObject {
return &bootstrapPriorityLevelConfiguration{
priorityLevelConfigurationClient: &boots.priorityLevelConfigurationClient,
bootstrap: boots.boots[i],
}
} }
func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapPriorityLevelConfigurations) getExistingObjects() ([]deletable, error) {
fsObject, ok := object.(*flowcontrolv1beta3.PriorityLevelConfiguration) objs, err := boots.lister.List(labels.Everything())
if !ok { if err != nil {
return nil, errObjectNotPriorityLevel return nil, fmt.Errorf("failed to list PriorityLevelConfiguration objects - %w", err)
}
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletablePriorityLevelConfiguration{
PriorityLevelConfiguration: obj,
client: boots.client,
}
}
return dels, nil
} }
return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) type bootstrapPriorityLevelConfiguration struct {
*priorityLevelConfigurationClient
// points to immutable content
bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration
} }
func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) { func (boot *bootstrapPriorityLevelConfiguration) getName() string {
return fs.lister.Get(name) return boot.bootstrap.Name
} }
func (fs *priorityLevelConfigurationWrapper) Delete(name string) error { func (boot *bootstrapPriorityLevelConfiguration) create(ctx context.Context) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) // Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
} }
func (fs *priorityLevelConfigurationWrapper) CopySpec(bootstrap, current runtime.Object) error { func (boot *bootstrapPriorityLevelConfiguration) getCurrent() (wantAndHave, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.PriorityLevelConfiguration) current, err := boot.lister.Get(boot.bootstrap.Name)
if !ok { if err != nil {
return errObjectNotPriorityLevel return nil, err
} }
currentFS, ok := current.(*flowcontrolv1beta3.PriorityLevelConfiguration) return &wantAndHavePriorityLevelConfiguration{
if !ok { client: boot.client,
return errObjectNotPriorityLevel want: boot.bootstrap,
have: current,
}, nil
} }
specCopy := bootstrapFS.Spec.DeepCopy() type wantAndHavePriorityLevelConfiguration struct {
currentFS.Spec = *specCopy client flowcontrolclient.PriorityLevelConfigurationInterface
return nil want *flowcontrolv1beta3.PriorityLevelConfiguration
have *flowcontrolv1beta3.PriorityLevelConfiguration
} }
func (fs *priorityLevelConfigurationWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { func (wah *wantAndHavePriorityLevelConfiguration) getWant() configurationObject {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.PriorityLevelConfiguration) return wah.want
if !ok {
return false, errObjectNotPriorityLevel
}
currentFS, ok := current.(*flowcontrolv1beta3.PriorityLevelConfiguration)
if !ok {
return false, errObjectNotPriorityLevel
} }
return priorityLevelSpecChanged(bootstrapFS, currentFS), nil func (wah *wantAndHavePriorityLevelConfiguration) getHave() configurationObject {
return wah.have
}
func (wah *wantAndHavePriorityLevelConfiguration) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatablePriorityLevelConfiguration{
PriorityLevelConfiguration: copy,
client: wah.client,
}
}
func (wah *wantAndHavePriorityLevelConfiguration) specsDiffer() bool {
return priorityLevelSpecChanged(wah.want, wah.have)
} }
func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool { func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool {
@ -210,3 +147,23 @@ func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevel
flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel) flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec) return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec)
} }
type updatablePriorityLevelConfiguration struct {
*flowcontrolv1beta3.PriorityLevelConfiguration
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (u *updatablePriorityLevelConfiguration) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.PriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}
type deletablePriorityLevelConfiguration struct {
*flowcontrolv1beta3.PriorityLevelConfiguration
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (dbl *deletablePriorityLevelConfiguration) delete(ctx context.Context /* resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
}

View File

@ -26,20 +26,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
) )
func TestEnsurePriorityLevel(t *testing.T) { func TestEnsurePriorityLevel(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
strategy func(flowcontrolclient.PriorityLevelConfigurationInterface, flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer strategy func() EnsureStrategy
current *flowcontrolv1beta3.PriorityLevelConfiguration current *flowcontrolv1beta3.PriorityLevelConfiguration
bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration
expected *flowcontrolv1beta3.PriorityLevelConfiguration expected *flowcontrolv1beta3.PriorityLevelConfiguration
@ -47,21 +45,21 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for suggested configurations // for suggested configurations
{ {
name: "suggested priority level configuration does not exist - the object should always be re-created", name: "suggested priority level configuration does not exist - the object should always be re-created",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil, current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithLimited(10).Object(),
}, },
{ {
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
}, },
{ {
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
@ -70,21 +68,21 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for mandatory configurations // for mandatory configurations
{ {
name: "mandatory priority level configuration does not exist - new object should be created", name: "mandatory priority level configuration does not exist - new object should be created",
strategy: NewMandatoryPriorityLevelEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
current: nil, current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "mandatory priority level configuration exists, annotation is missing - annotation is added", name: "mandatory priority level configuration exists, annotation is missing - annotation is added",
strategy: NewMandatoryPriorityLevelEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithLimited(20).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
}, },
{ {
name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated", name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryPriorityLevelEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
@ -100,9 +98,10 @@ func TestEnsurePriorityLevel(t *testing.T) {
indexer.Add(test.current) indexer.Add(test.current)
} }
ensurer := test.strategy(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap})
strategy := test.strategy()
err := ensurer.Ensure([]*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap}) err := EnsureConfigurations(context.Background(), boots, strategy)
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -207,12 +206,19 @@ func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{}) strategy := NewSuggestedEnsureStrategy()
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) wah := &wantAndHavePriorityLevelConfiguration{
want: test.bootstrap,
have: test.current,
}
updatableGot, updateGot, err := strategy.ShouldUpdate(wah)
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error, but got: %v", err)
} }
var newObjectGot *flowcontrolv1beta3.PriorityLevelConfiguration
if updatableGot != nil {
newObjectGot = updatableGot.(*updatablePriorityLevelConfiguration).PriorityLevelConfiguration
}
if test.newObjectExpected == nil { if test.newObjectExpected == nil {
if newObjectGot != nil { if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot) t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
@ -308,24 +314,42 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
removeExpected bool removeExpected bool
}{ }{
{ {
name: "priority level configuration does not exist", name: "no priority level configuration objects exist",
bootstrapName: "pl1", bootstrapName: "pl1",
current: nil, current: nil,
}, },
{ {
name: "priority level configuration exists, auto update is enabled", name: "priority level configuration not wanted, auto update is enabled",
bootstrapName: "pl1", bootstrapName: "pl0",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
removeExpected: true, removeExpected: true,
}, },
{ {
name: "priority level configuration exists, auto update is disabled", name: "priority level configuration not wanted, auto update is disabled",
bootstrapName: "pl0",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(),
removeExpected: false,
},
{
name: "priority level configuration not wanted, the auto-update annotation is malformed",
bootstrapName: "pl0",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false,
},
{
name: "priority level configuration wanted, auto update is enabled",
bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
removeExpected: false,
},
{
name: "priority level configuration wanted, auto update is disabled",
bootstrapName: "pl1", bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(),
removeExpected: false, removeExpected: false,
}, },
{ {
name: "priority level configuration exists, the auto-update annotation is malformed", name: "priority level configuration wanted, the auto-update annotation is malformed",
bootstrapName: "pl1", bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false, removeExpected: false,
@ -341,8 +365,9 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
indexer.Add(test.current) indexer.Add(test.current)
} }
remover := NewPriorityLevelRemover(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) boot := newPLConfiguration(test.bootstrapName).Object()
err := remover.RemoveAutoUpdateEnabledObjects([]string{test.bootstrapName}) boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{boot})
err := RemoveUnwantedObjects(context.Background(), boots)
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -350,7 +375,7 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
if test.current == nil { if test.current == nil {
return return
} }
_, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) _, err = client.Get(context.TODO(), test.current.Name, metav1.GetOptions{})
switch { switch {
case test.removeExpected: case test.removeExpected:
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
@ -365,85 +390,6 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
} }
} }
func TestGetPriorityLevelRemoveCandidate(t *testing.T) {
tests := []struct {
name string
current []*flowcontrolv1beta3.PriorityLevelConfiguration
bootstrap []*flowcontrolv1beta3.PriorityLevelConfiguration
expected []string
}{
{
name: "no object has been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{},
},
{
name: "bootstrap is empty, all current objects with the annotation should be candidates",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").Object(),
},
expected: []string{"pl1", "pl2"},
},
{
name: "object(s) have been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{"pl2", "pl3"},
},
{
name: "object(s) without the annotation key are ignored",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").Object(),
newPLConfiguration("pl3").Object(),
},
expected: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for i := range test.current {
indexer.Add(test.current[i])
}
lister := flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)
removeListGot, err := GetPriorityLevelRemoveCandidates(lister, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool {
return a < b
})) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})
}
}
type plBuilder struct { type plBuilder struct {
object *flowcontrolv1beta3.PriorityLevelConfiguration object *flowcontrolv1beta3.PriorityLevelConfiguration
} }

View File

@ -17,26 +17,25 @@ limitations under the License.
package ensurer package ensurer
import ( import (
"errors" "context"
"fmt" "fmt"
"strconv" "strconv"
"github.com/google/go-cmp/cmp"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"github.com/google/go-cmp/cmp"
) )
const ( const (
fieldManager = "api-priority-and-fairness-config-producer-v1" fieldManager = "api-priority-and-fairness-config-producer-v1"
) )
// ensureStrategy provides a strategy for ensuring apf bootstrap configurationWrapper. // EnsureStrategy provides a maintenance strategy for APF configuration objects.
// We have two types of configurationWrapper objects: // We have two types of strategy, corresponding to the two types of config objetcs:
// //
// - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F // - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F
// system itself won't crash; we have to be sure there's 'catch-all' place for // system itself won't crash; we have to be sure there's 'catch-all' place for
@ -45,58 +44,67 @@ const (
// //
// - suggested: additional configurationWrapper objects for initial behavior. // - suggested: additional configurationWrapper objects for initial behavior.
// the cluster operators have an option to edit or delete these configurationWrapper objects. // the cluster operators have an option to edit or delete these configurationWrapper objects.
type ensureStrategy interface { type EnsureStrategy interface {
// Name of the strategy, for now we have two: 'mandatory' and 'suggested'. // Name of the strategy, for now we have two: 'mandatory' and 'suggested'.
// This comes handy in logging. // This comes handy in logging.
Name() string Name() string
// ShouldUpdate accepts the current and the bootstrap configuration and determines // ShouldUpdate accepts a pair of the current and the bootstrap configuration and determines
// whether an update is necessary. // whether an update is necessary.
// current is the existing in-cluster configuration object. // current is the existing in-cluster configuration object.
// bootstrap is the configuration the kube-apiserver maintains in-memory. // bootstrap is the configuration the kube-apiserver maintains in-memory.
// //
// revised: the new object represents the new configuration to be stored in-cluster.
// ok: true if auto update is required, otherwise false // ok: true if auto update is required, otherwise false
// object: the new object represents the new configuration to be stored in-cluster.
// err: err is set when the function runs into an error and can not // err: err is set when the function runs into an error and can not
// determine if auto update is needed. // determine if auto update is needed.
ShouldUpdate(current, bootstrap configurationObject) (object runtime.Object, ok bool, err error) ShouldUpdate(wantAndHave) (revised updatable, ok bool, err error)
} }
// this internal interface provides abstraction for dealing with the `Spec` // BootstrapObjects is a generic interface to a list of bootstrap objects bound up with the relevant operations on them.
// of both 'FlowSchema' and 'PriorityLevelConfiguration' objects. // The binding makes it unnecessary to have any type casts.
// Since the ensure logic for both types is common, we use a few internal interfaces // A bootstrap object is a mandatory or suggested config object,
// to abstract out the differences of these two types. // with the spec that the code is built to provide.
type specCopier interface { type BootstrapObjects interface {
// HasSpecChanged returns true if the spec of both the bootstrap and typeName() string // the Kind of the objects
// the current configuration object is same, otherwise false. len() int // number of objects
HasSpecChanged(bootstrap, current runtime.Object) (bool, error) get(int) bootstrapObject // extract one object, origin 0
getExistingObjects() ([]deletable, error) // returns all the APF config objects that exist at the moment
// CopySpec makes a deep copy the spec of the bootstrap object
// and copies it to that of the current object.
// CopySpec assumes that the current object is safe to mutate, so it
// rests with the caller to make a deep copy of the current.
CopySpec(bootstrap, current runtime.Object) error
} }
// this internal interface provides abstraction for CRUD operation // deletable is an existing config object and it supports the delete operation
// related to both 'FlowSchema' and 'PriorityLevelConfiguration' objects. type deletable interface {
// Since the ensure logic for both types is common, we use a few internal interfaces configurationObject
// to abstract out the differences of these two types. delete(context.Context) error // delete the object. TODO: make conditional on ResouceVersion
type configurationClient interface {
Create(object runtime.Object) (runtime.Object, error)
Update(object runtime.Object) (runtime.Object, error)
Get(name string) (configurationObject, error)
Delete(name string) error
} }
type configurationWrapper interface { // bootstrapObject is a single bootstrap object.
// TypeName returns the type of the configuration that this interface deals with. // Its spec is what the code provides.
// We use it to log the type name of the configuration object being ensured. type bootstrapObject interface {
// It is either 'PriorityLevelConfiguration' or 'FlowSchema' typeName() string // the Kind of the object
TypeName() string getName() string // the object's name
create(context.Context) error // request the server to create the object
getCurrent() (wantAndHave, error) // pair up with the object as it currently exists
}
configurationClient // wantAndHave is a pair of versions of an APF config object.
specCopier // The "want" has the spec that the code provides.
// The "have" is what came from the server.
type wantAndHave interface {
getWant() configurationObject
getHave() configurationObject
specsDiffer() bool
// copyHave returns a copy of the "have" version,
// optionally with spec replaced by the spec from "want".
copyHave(specFromWant bool) updatable
}
// updatable is an APF config object that can be written back to the apiserver
type updatable interface {
configurationObject
update(context.Context) error
} }
// A convenient wrapper interface that is used by the ensure logic. // A convenient wrapper interface that is used by the ensure logic.
@ -105,17 +113,17 @@ type configurationObject interface {
runtime.Object runtime.Object
} }
func newSuggestedEnsureStrategy(copier specCopier) ensureStrategy { // NewSuggestedEnsureStrategy returns an EnsureStrategy for suggested config objects
func NewSuggestedEnsureStrategy() EnsureStrategy {
return &strategy{ return &strategy{
copier: copier,
alwaysAutoUpdateSpec: false, alwaysAutoUpdateSpec: false,
name: "suggested", name: "suggested",
} }
} }
func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy { // NewMandatoryEnsureStrategy returns an EnsureStrategy for mandatory config objects
func NewMandatoryEnsureStrategy() EnsureStrategy {
return &strategy{ return &strategy{
copier: copier,
alwaysAutoUpdateSpec: true, alwaysAutoUpdateSpec: true,
name: "mandatory", name: "mandatory",
} }
@ -123,7 +131,6 @@ func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy {
// auto-update strategy for the configuration objects // auto-update strategy for the configuration objects
type strategy struct { type strategy struct {
copier specCopier
alwaysAutoUpdateSpec bool alwaysAutoUpdateSpec bool
name string name string
} }
@ -132,8 +139,10 @@ func (s *strategy) Name() string {
return s.name return s.name
} }
func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) { func (s *strategy) ShouldUpdate(wah wantAndHave) (updatable, bool, error) {
if current == nil || bootstrap == nil { current := wah.getHave()
if current == nil {
return nil, false, nil return nil, false, nil
} }
@ -143,39 +152,23 @@ func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime
} }
updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec) updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec)
var specChanged bool specChanged := autoUpdateSpec && wah.specsDiffer()
if autoUpdateSpec {
changed, err := s.copier.HasSpecChanged(bootstrap, current)
if err != nil {
return nil, false, fmt.Errorf("failed to compare spec - %w", err)
}
specChanged = changed
}
if !(updateAnnotation || specChanged) { if !(updateAnnotation || specChanged) {
// the annotation key is up to date and the spec has not changed, no update is necessary // the annotation key is up to date and the spec has not changed, no update is necessary
return nil, false, nil return nil, false, nil
} }
// if we are here, either we need to update the annotation key or the spec. revised := wah.copyHave(specChanged)
copy, ok := current.DeepCopyObject().(configurationObject)
if !ok {
// we should never be here
return nil, false, errors.New("incompatible object type")
}
if updateAnnotation { if updateAnnotation {
setAutoUpdateAnnotation(copy, autoUpdateSpec) setAutoUpdateAnnotation(revised, autoUpdateSpec)
}
if specChanged {
s.copier.CopySpec(bootstrap, copy)
} }
return copy, true, nil return revised, true, nil
} }
// shouldUpdateSpec inspects the auto-update annotation key and generation field to determine // shouldUpdateSpec inspects the auto-update annotation key and generation field to determine
// whether the configurationWrapper object should be auto-updated. // whether the config object should be auto-updated.
func shouldUpdateSpec(accessor metav1.Object) bool { func shouldUpdateSpec(accessor metav1.Object) bool {
value, _ := accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] value, _ := accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]
if autoUpdate, err := strconv.ParseBool(value); err == nil { if autoUpdate, err := strconv.ParseBool(value); err == nil {
@ -215,130 +208,130 @@ func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) {
accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate) accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate)
} }
// ensureConfiguration ensures the boostrap configurationWrapper on the cluster based on the specified strategy. // EnsureConfigurations applies the given maintenance strategy to the given objects.
func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy, bootstrap configurationObject) error { // At the first error, if any, it stops and returns that error.
name := bootstrap.GetName() func EnsureConfigurations(ctx context.Context, boots BootstrapObjects, strategy EnsureStrategy) error {
len := boots.len()
for i := 0; i < len; i++ {
bo := boots.get(i)
err := EnsureConfiguration(ctx, bo, strategy)
if err != nil {
return err
}
}
return nil
}
// EnsureConfiguration applies the given maintenance strategy to the given object.
func EnsureConfiguration(ctx context.Context, bootstrap bootstrapObject, strategy EnsureStrategy) error {
name := bootstrap.getName()
configurationType := strategy.Name() configurationType := strategy.Name()
var current configurationObject var wah wantAndHave
var err error var err error
for { for {
current, err = wrapper.Get(bootstrap.GetName()) wah, err = bootstrap.getCurrent()
if err == nil { if err == nil {
break break
} }
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
// we always re-create a missing configuration object // we always re-create a missing configuration object
if _, err = wrapper.Create(bootstrap); err == nil { if err = bootstrap.create(ctx); err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.typeName()), "type", configurationType, "name", name)
return nil return nil
} }
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.typeName()), "type", configurationType, "name", name)
} }
klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.typeName()), "type", configurationType, "name", name)
newObject, update, err := strategy.ShouldUpdate(current, bootstrap) newObject, update, err := strategy.ShouldUpdate(wah)
if err != nil { if err != nil {
return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
if !update { if !update {
if klogV := klog.V(5); klogV.Enabled() { if klogV := klog.V(5); klogV.Enabled() {
klogV.InfoS("No update required", "wrapper", wrapper.TypeName(), "type", configurationType, "name", name, klogV.InfoS("No update required", "wrapper", bootstrap.typeName(), "type", configurationType, "name", name,
"diff", cmp.Diff(current, bootstrap)) "diff", cmp.Diff(wah.getHave(), wah.getWant()))
} }
return nil return nil
} }
if _, err = wrapper.Update(newObject); err == nil { if err = newObject.update(ctx); err == nil {
klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject)) klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.typeName(), configurationType, name, cmp.Diff(wah.getHave(), wah.getWant()))
return nil return nil
} }
if apierrors.IsConflict(err) { if apierrors.IsConflict(err) {
klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.typeName()), "type", configurationType, "name", name)
return nil return nil
} }
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
// removeAutoUpdateEnabledConfiguration makes an attempt to remove the given // RemoveUnwantedObjects attempts to delete the configuration objects
// configuration object if automatic update of the spec is enabled for this object. // that exist, are annotated `apf.kubernetes.io/autoupdate-spec=true`, and do not
func removeAutoUpdateEnabledConfiguration(wrapper configurationWrapper, name string) error { // have a name in the given set. A refusal due to concurrent update is logged
current, err := wrapper.Get(name) // and not considered an error; the object will be reconsidered later.
func RemoveUnwantedObjects(ctx context.Context, boots BootstrapObjects) error {
current, err := boots.getExistingObjects()
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { return err
return nil
} }
wantedNames := namesOfBootstrapObjects(boots)
return fmt.Errorf("failed to retrieve the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err) for _, object := range current {
name := object.GetName()
if wantedNames.Has(name) {
continue
} }
var value string
value := current.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] var ok, autoUpdate bool
autoUpdate, err := strconv.ParseBool(value) var err error
if err != nil { if value, ok = object.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]; !ok {
klog.ErrorS(err, fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name)
// This may need manual intervention, in case the annotation value is malformed,
// so don't return an error, that might trigger futile retry loop.
return nil
}
if !autoUpdate {
klog.V(5).InfoS(fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name)
return nil
}
if err := wrapper.Delete(name); err != nil {
if apierrors.IsNotFound(err) {
klog.V(5).InfoS(fmt.Sprintf("Something concurrently deleted the %s", wrapper.TypeName()), "name", name)
return nil
}
return fmt.Errorf("failed to delete the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err)
}
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the %s", wrapper.TypeName()), "name", name)
return nil
}
// getDanglingBootstrapObjectNames returns a list of names of bootstrap
// configuration objects that are potentially candidates for deletion from
// the cluster, given a set of bootstrap and current configuration.
// - bootstrap: a set of hard coded configuration kube-apiserver maintains in-memory.
// - current: a set of configuration objects that exist on the cluster
//
// Any object present in current is added to the list if both a and b are true:
//
// a. the object in current is missing from the bootstrap configuration
// b. the object has the designated auto-update annotation key
//
// This function shares the common logic for both FlowSchema and
// PriorityLevelConfiguration type and hence it accepts metav1.Object only.
func getDanglingBootstrapObjectNames(bootstrap sets.String, current []metav1.Object) []string {
if len(current) == 0 {
return nil
}
candidates := make([]string, 0)
for i := range current {
object := current[i]
if _, ok := object.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]; !ok {
// the configuration object does not have the annotation key, // the configuration object does not have the annotation key,
// it's probably a user defined configuration object, // it's probably a user defined configuration object,
// so we can skip it. // so we can skip it.
klog.V(5).InfoS("Skipping deletion of APF object with no "+flowcontrolv1beta3.AutoUpdateAnnotationKey+" annotation", "name", name)
continue continue
} }
autoUpdate, err = strconv.ParseBool(value)
if err != nil {
// Log this because it is not an expected situation.
klog.V(4).InfoS("Skipping deletion of APF object with malformed "+flowcontrolv1beta3.AutoUpdateAnnotationKey+" annotation", "name", name, "annotationValue", value, "parseError", err)
continue
}
if !autoUpdate {
klog.V(5).InfoS("Skipping deletion of APF object with "+flowcontrolv1beta3.AutoUpdateAnnotationKey+"=false annotation", "name", name)
continue
}
// TODO: expectedResourceVersion := object.GetResourceVersion()
err = object.delete(ctx /* TODO: expectedResourceVersion */)
if err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", boots.typeName()), "name", name)
continue
}
if apierrors.IsNotFound(err) {
klog.V(5).InfoS("Unwanted APF object was concurrently deleted", "name", name)
} else {
return fmt.Errorf("failed to delete unwatned APF object %q - %w", name, err)
}
}
return nil
}
if _, ok := bootstrap[object.GetName()]; !ok { func namesOfBootstrapObjects(bos BootstrapObjects) sets.String {
candidates = append(candidates, object.GetName()) names := sets.NewString()
len := bos.len()
for i := 0; i < len; i++ {
bo := bos.get(i)
names.Insert(bo.getName())
} }
} return names
return candidates
} }

View File

@ -141,6 +141,7 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
return fmt.Errorf("failed to initialize clientset for APF - %w", err) return fmt.Errorf("failed to initialize clientset for APF - %w", err)
} }
err = func() error {
// get a derived context that gets cancelled after 5m or // get a derived context that gets cancelled after 5m or
// when the StopCh gets closed, whichever happens first. // when the StopCh gets closed, whichever happens first.
ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
@ -154,23 +155,29 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
ctx, ctx,
time.Second, time.Second,
func(context.Context) (bool, error) { func(context.Context) (bool, error) {
if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
return false, nil return false, nil
} }
return true, nil return true, nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize APF bootstrap configuration") return fmt.Errorf("unable to initialize APF bootstrap configuration: %w", err)
}
return nil
}()
if err != nil {
return err
} }
// we have successfully initialized the bootstrap configuration, now we // we have successfully initialized the bootstrap configuration, now we
// spin up a goroutine which reconciles the bootstrap configuration periodically. // spin up a goroutine which reconciles the bootstrap configuration periodically.
go func() { go func() {
ctx := wait.ContextForChannel(hookContext.StopCh)
wait.PollImmediateUntil( wait.PollImmediateUntil(
time.Minute, time.Minute,
func() (bool, error) { func() (bool, error) {
if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
} }
// always auto update both suggested and mandatory configuration // always auto update both suggested and mandatory configuration
@ -182,79 +189,64 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
return nil return nil
} }
func ensure(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func ensure(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
if err := ensureSuggestedConfiguration(clientset, fsLister, plcLister); err != nil {
if err := ensureSuggestedConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
// We should not attempt creation of mandatory objects if ensuring the suggested // We should not attempt creation of mandatory objects if ensuring the suggested
// configuration resulted in an error. // configuration resulted in an error.
// This only happens when the stop channel is closed. // This only happens when the stop channel is closed.
return fmt.Errorf("failed ensuring suggested settings - %w", err) return fmt.Errorf("failed ensuring suggested settings - %w", err)
} }
if err := ensureMandatoryConfiguration(clientset, fsLister, plcLister); err != nil { if err := ensureMandatoryConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
return fmt.Errorf("failed ensuring mandatory settings - %w", err) return fmt.Errorf("failed ensuring mandatory settings - %w", err)
} }
if err := removeDanglingBootstrapConfiguration(clientset, fsLister, plcLister); err != nil { if err := removeDanglingBootstrapConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
return fmt.Errorf("failed to delete removed settings - %w", err) return fmt.Errorf("failed to delete removed settings - %w", err)
} }
return nil return nil
} }
func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func ensureSuggestedConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), plcLister) plcSuggesteds := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
if err := plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations); err != nil { if err := ensurer.EnsureConfigurations(ctx, plcSuggesteds, ensurer.NewSuggestedEnsureStrategy()); err != nil {
return err return err
} }
fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) fsSuggesteds := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.SuggestedFlowSchemas)
return fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas) return ensurer.EnsureConfigurations(ctx, fsSuggesteds, ensurer.NewSuggestedEnsureStrategy())
} }
func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func ensureMandatoryConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) plcMandatories := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations)
if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil { if err := ensurer.EnsureConfigurations(ctx, plcMandatories, ensurer.NewMandatoryEnsureStrategy()); err != nil {
return err return err
} }
plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations(), plcLister) fsMandatories := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.MandatoryFlowSchemas)
return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations) return ensurer.EnsureConfigurations(ctx, fsMandatories, ensurer.NewMandatoryEnsureStrategy())
} }
func removeDanglingBootstrapConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
if err := removeDanglingBootstrapFlowSchema(clientset.FlowSchemas(), fsLister); err != nil { if err := removeDanglingBootstrapFlowSchema(ctx, clientset, fsLister); err != nil {
return err return err
} }
return removeDanglingBootstrapPriorityLevel(clientset.PriorityLevelConfigurations(), plcLister) return removeDanglingBootstrapPriorityLevel(ctx, clientset, plcLister)
} }
func removeDanglingBootstrapFlowSchema(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) error { func removeDanglingBootstrapFlowSchema(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...) bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
candidates, err := ensurer.GetFlowSchemaRemoveCandidates(lister, bootstrap) fsBoots := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, bootstrap)
if err != nil { return ensurer.RemoveUnwantedObjects(ctx, fsBoots)
return err
}
if len(candidates) == 0 {
return nil
} }
fsRemover := ensurer.NewFlowSchemaRemover(client, lister) func removeDanglingBootstrapPriorityLevel(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
return fsRemover.RemoveAutoUpdateEnabledObjects(candidates)
}
func removeDanglingBootstrapPriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...) bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
candidates, err := ensurer.GetPriorityLevelRemoveCandidates(lister, bootstrap) plcBoots := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, bootstrap)
if err != nil { return ensurer.RemoveUnwantedObjects(ctx, plcBoots)
return err
}
if len(candidates) == 0 {
return nil
}
plRemover := ensurer.NewPriorityLevelRemover(client, lister)
return plRemover.RemoveAutoUpdateEnabledObjects(candidates)
} }
// contextFromChannelAndMaxWaitDuration returns a Context that is bound to the // contextFromChannelAndMaxWaitDuration returns a Context that is bound to the