Improve and simplify maintenance of APF bootstrap objects

Prepare to make deletion of unwanted object conditional on ResourceVersion.

Remove unnecessary split between finding unwanted objects and removing
them.

Remove unnecessary layers of indirection to reach constant logic.

Use interfaces to remove need for type assertions.

Threaded context into APF object maintenance

Note and respect immutability of desired bootstrap objects
This commit is contained in:
Mike Spreitzer 2022-01-21 17:52:27 -05:00
parent 7e25f1232a
commit 008576da07
6 changed files with 510 additions and 714 deletions

View File

@ -18,190 +18,128 @@ package ensurer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
) )
var ( // WrapBootstrapFlowSchemas creates a generic representation of the given bootstrap objects bound with their operations
errObjectNotFlowSchema = errors.New("object is not a FlowSchema type") // Every object in `boots` is immutable.
) func WrapBootstrapFlowSchemas(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister, boots []*flowcontrolv1beta3.FlowSchema) BootstrapObjects {
return &bootstrapFlowSchemas{
// FlowSchemaEnsurer ensures the specified bootstrap configuration objects flowSchemaClient: flowSchemaClient{
type FlowSchemaEnsurer interface {
Ensure([]*flowcontrolv1beta3.FlowSchema) error
}
// FlowSchemaRemover is the interface that wraps the
// RemoveAutoUpdateEnabledObjects method.
//
// RemoveAutoUpdateEnabledObjects removes a set of bootstrap FlowSchema
// objects specified via their names. The function removes an object
// only if automatic update of the spec is enabled for it.
type FlowSchemaRemover interface {
RemoveAutoUpdateEnabledObjects([]string) error
}
// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of suggested FlowSchema configuration objects.
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client, client: client,
lister: lister, lister: lister},
} boots: boots,
return &fsEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper,
} }
} }
// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that type flowSchemaClient struct {
// can be used to ensure a set of mandatory FlowSchema configuration objects.
func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
lister: lister,
}
return &fsEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
}
// NewFlowSchemaRemover returns a FlowSchemaRemover instance that
// can be used to remove a set of FlowSchema configuration objects.
func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) FlowSchemaRemover {
return &fsEnsurer{
wrapper: &flowSchemaWrapper{
client: client,
lister: lister,
},
}
}
// GetFlowSchemaRemoveCandidates returns a list of FlowSchema object
// names that are candidates for deletion from the cluster.
// bootstrap: a set of hard coded FlowSchema configuration objects
// kube-apiserver maintains in-memory.
func GetFlowSchemaRemoveCandidates(lister flowcontrollisters.FlowSchemaLister, bootstrap []*flowcontrolv1beta3.FlowSchema) ([]string, error) {
fsList, err := lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list FlowSchema - %w", err)
}
bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(fsList))
for i := range fsList {
currentObjects[i] = fsList[i]
}
return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil
}
type fsEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
}
func (e *fsEnsurer) Ensure(flowSchemas []*flowcontrolv1beta3.FlowSchema) error {
for _, flowSchema := range flowSchemas {
// This code gets called by different goroutines. To avoid race conditions when
// https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243
// temporarily modifies the TypeMeta, we have to make a copy here.
if err := ensureConfiguration(e.wrapper, e.strategy, flowSchema.DeepCopy()); err != nil {
return err
}
}
return nil
}
func (e *fsEnsurer) RemoveAutoUpdateEnabledObjects(flowSchemas []string) error {
for _, flowSchema := range flowSchemas {
if err := removeAutoUpdateEnabledConfiguration(e.wrapper, flowSchema); err != nil {
return err
}
}
return nil
}
// flowSchemaWrapper abstracts all FlowSchema specific logic, with this
// we can manage all boiler plate code in one place.
type flowSchemaWrapper struct {
client flowcontrolclient.FlowSchemaInterface client flowcontrolclient.FlowSchemaInterface
lister flowcontrollisters.FlowSchemaLister lister flowcontrollisters.FlowSchemaLister
} }
func (fs *flowSchemaWrapper) TypeName() string { type bootstrapFlowSchemas struct {
flowSchemaClient
// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.FlowSchema
}
func (*flowSchemaClient) typeName() string {
return "FlowSchema" return "FlowSchema"
} }
func (fs *flowSchemaWrapper) Create(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapFlowSchemas) len() int {
fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema) return len(boots.boots)
if !ok {
return nil, errObjectNotFlowSchema
} }
return fs.client.Create(context.TODO(), fsObject, metav1.CreateOptions{FieldManager: fieldManager}) func (boots *bootstrapFlowSchemas) get(i int) bootstrapObject {
return &bootstrapFlowSchema{
flowSchemaClient: &boots.flowSchemaClient,
bootstrap: boots.boots[i],
}
} }
func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapFlowSchemas) getExistingObjects() ([]deletable, error) {
fsObject, ok := object.(*flowcontrolv1beta3.FlowSchema) objs, err := boots.lister.List(labels.Everything())
if !ok { if err != nil {
return nil, errObjectNotFlowSchema return nil, fmt.Errorf("failed to list FlowSchema objects - %w", err)
}
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletableFlowSchema{
FlowSchema: obj,
client: boots.client,
}
}
return dels, nil
} }
return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) type bootstrapFlowSchema struct {
*flowSchemaClient
// points to immutable contnet
bootstrap *flowcontrolv1beta3.FlowSchema
} }
func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) { func (boot *bootstrapFlowSchema) getName() string {
return fs.lister.Get(name) return boot.bootstrap.Name
} }
func (fs *flowSchemaWrapper) Delete(name string) error { func (boot *bootstrapFlowSchema) create(ctx context.Context) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) // Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
} }
func (fs *flowSchemaWrapper) CopySpec(bootstrap, current runtime.Object) error { func (boot *bootstrapFlowSchema) getCurrent() (wantAndHave, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema) current, err := boot.lister.Get(boot.bootstrap.Name)
if !ok { if err != nil {
return errObjectNotFlowSchema return nil, err
} }
currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema) return &wantAndHaveFlowSchema{
if !ok { client: boot.client,
return errObjectNotFlowSchema want: boot.bootstrap,
have: current,
}, nil
} }
specCopy := bootstrapFS.Spec.DeepCopy() type wantAndHaveFlowSchema struct {
currentFS.Spec = *specCopy client flowcontrolclient.FlowSchemaInterface
return nil want *flowcontrolv1beta3.FlowSchema
have *flowcontrolv1beta3.FlowSchema
} }
func (fs *flowSchemaWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { func (wah *wantAndHaveFlowSchema) getWant() configurationObject {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.FlowSchema) return wah.want
if !ok {
return false, errObjectNotFlowSchema
}
currentFS, ok := current.(*flowcontrolv1beta3.FlowSchema)
if !ok {
return false, errObjectNotFlowSchema
} }
return flowSchemaSpecChanged(bootstrapFS, currentFS), nil func (wah *wantAndHaveFlowSchema) getHave() configurationObject {
return wah.have
}
func (wah *wantAndHaveFlowSchema) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatableFlowSchema{
FlowSchema: copy,
client: wah.client,
}
}
func (wah *wantAndHaveFlowSchema) specsDiffer() bool {
return flowSchemaSpecChanged(wah.want, wah.have)
} }
func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool { func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool {
@ -209,3 +147,23 @@ func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool
flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema) flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec) return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec)
} }
type updatableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}
func (u *updatableFlowSchema) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.FlowSchema, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}
type deletableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}
func (dbl *deletableFlowSchema) delete(ctx context.Context /* TODO: resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
}

View File

@ -26,20 +26,23 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func init() {
klog.InitFlags(nil)
}
func TestEnsureFlowSchema(t *testing.T) { func TestEnsureFlowSchema(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
strategy func(flowcontrolclient.FlowSchemaInterface, flowcontrollisters.FlowSchemaLister) FlowSchemaEnsurer strategy func() EnsureStrategy
current *flowcontrolv1beta3.FlowSchema current *flowcontrolv1beta3.FlowSchema
bootstrap *flowcontrolv1beta3.FlowSchema bootstrap *flowcontrolv1beta3.FlowSchema
expected *flowcontrolv1beta3.FlowSchema expected *flowcontrolv1beta3.FlowSchema
@ -47,21 +50,21 @@ func TestEnsureFlowSchema(t *testing.T) {
// for suggested configurations // for suggested configurations
{ {
name: "suggested flow schema does not exist - the object should always be re-created", name: "suggested flow schema does not exist - the object should always be re-created",
strategy: NewSuggestedFlowSchemaEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil, current: nil,
expected: newFlowSchema("fs1", "pl1", 100).Object(), expected: newFlowSchema("fs1", "pl1", 100).Object(),
}, },
{ {
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedFlowSchemaEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedFlowSchemaEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
@ -70,21 +73,21 @@ func TestEnsureFlowSchema(t *testing.T) {
// for mandatory configurations // for mandatory configurations
{ {
name: "mandatory flow schema does not exist - new object should be created", name: "mandatory flow schema does not exist - new object should be created",
strategy: NewMandatoryFlowSchemaEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
current: nil, current: nil,
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "mandatory flow schema exists, annotation is missing - annotation should be added", name: "mandatory flow schema exists, annotation is missing - annotation should be added",
strategy: NewMandatoryFlowSchemaEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 100).Object(), current: newFlowSchema("fs1", "pl1", 100).Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated", name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryFlowSchemaEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(), expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
@ -100,9 +103,9 @@ func TestEnsureFlowSchema(t *testing.T) {
indexer.Add(test.current) indexer.Add(test.current)
} }
ensurer := test.strategy(client, flowcontrollisters.NewFlowSchemaLister(indexer)) boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{test.bootstrap})
strategy := test.strategy()
err := ensurer.Ensure([]*flowcontrolv1beta3.FlowSchema{test.bootstrap}) err := EnsureConfigurations(context.Background(), boots, strategy)
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -207,12 +210,19 @@ func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{}) wah := &wantAndHaveFlowSchema{
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) want: test.bootstrap,
have: test.current,
}
strategy := NewSuggestedEnsureStrategy()
updatableGot, updateGot, err := strategy.ShouldUpdate(wah)
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error, but got: %v", err)
} }
var newObjectGot *flowcontrolv1beta3.FlowSchema
if updatableGot != nil {
newObjectGot = updatableGot.(*updatableFlowSchema).FlowSchema
}
if test.newObjectExpected == nil { if test.newObjectExpected == nil {
if newObjectGot != nil { if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot) t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
@ -288,24 +298,42 @@ func TestRemoveFlowSchema(t *testing.T) {
removeExpected bool removeExpected bool
}{ }{
{ {
name: "flow schema does not exist", name: "no flow schema objects exist",
bootstrapName: "fs1", bootstrapName: "fs1",
current: nil, current: nil,
}, },
{ {
name: "flow schema exists, auto update is enabled", name: "flow schema unwanted, auto update is enabled",
bootstrapName: "fs1", bootstrapName: "fs0",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
removeExpected: true, removeExpected: true,
}, },
{ {
name: "flow schema exists, auto update is disabled", name: "flow schema unwanted, auto update is disabled",
bootstrapName: "fs0",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
removeExpected: false,
},
{
name: "flow schema unwanted, the auto-update annotation is malformed",
bootstrapName: "fs0",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false,
},
{
name: "flow schema wanted, auto update is enabled",
bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
removeExpected: false,
},
{
name: "flow schema wanted, auto update is disabled",
bootstrapName: "fs1", bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
removeExpected: false, removeExpected: false,
}, },
{ {
name: "flow schema exists, the auto-update annotation is malformed", name: "flow schema wanted, the auto-update annotation is malformed",
bootstrapName: "fs1", bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false, removeExpected: false,
@ -320,9 +348,10 @@ func TestRemoveFlowSchema(t *testing.T) {
client.Create(context.TODO(), test.current, metav1.CreateOptions{}) client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current) indexer.Add(test.current)
} }
bootFS := newFlowSchema(test.bootstrapName, "pl", 100).Object()
boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{bootFS})
err := RemoveUnwantedObjects(context.Background(), boots)
remover := NewFlowSchemaRemover(client, flowcontrollisters.NewFlowSchemaLister(indexer))
err := remover.RemoveAutoUpdateEnabledObjects([]string{test.bootstrapName})
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -330,100 +359,21 @@ func TestRemoveFlowSchema(t *testing.T) {
if test.current == nil { if test.current == nil {
return return
} }
_, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) _, err = client.Get(context.TODO(), test.current.Name, metav1.GetOptions{})
switch { switch {
case test.removeExpected: case test.removeExpected:
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err) t.Errorf("Expected error from Get after Delete: %q, but got: %v", metav1.StatusReasonNotFound, err)
} }
default: default:
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error from Get after Delete, but got: %v", err)
} }
} }
}) })
} }
} }
func TestGetFlowSchemaRemoveCandidate(t *testing.T) {
tests := []struct {
name string
current []*flowcontrolv1beta3.FlowSchema
bootstrap []*flowcontrolv1beta3.FlowSchema
expected []string
}{
{
name: "no object has been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{},
},
{
name: "bootstrap is empty, all current objects with the annotation should be candidates",
bootstrap: []*flowcontrolv1beta3.FlowSchema{},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).Object(),
},
expected: []string{"fs1", "fs2"},
},
{
name: "object(s) have been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{"fs2", "fs3"},
},
{
name: "object(s) without the annotation key are ignored",
bootstrap: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).Object(),
newFlowSchema("fs3", "pl3", 300).Object(),
},
expected: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for i := range test.current {
indexer.Add(test.current[i])
}
lister := flowcontrollisters.NewFlowSchemaLister(indexer)
removeListGot, err := GetFlowSchemaRemoveCandidates(lister, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool {
return a < b
})) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})
}
}
type fsBuilder struct { type fsBuilder struct {
object *flowcontrolv1beta3.FlowSchema object *flowcontrolv1beta3.FlowSchema
} }

View File

@ -18,191 +18,128 @@ package ensurer
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
) )
var ( // WrapBootstrapPriorityLevelConfigurations creates a generic representation of the given bootstrap objects bound with their operations.
errObjectNotPriorityLevel = errors.New("object is not a PriorityLevelConfiguration type") // Every object in `boots` is immutable.
) func WrapBootstrapPriorityLevelConfigurations(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister, boots []*flowcontrolv1beta3.PriorityLevelConfiguration) BootstrapObjects {
return &bootstrapPriorityLevelConfigurations{
// PriorityLevelEnsurer ensures the specified bootstrap configuration objects priorityLevelConfigurationClient: priorityLevelConfigurationClient{
type PriorityLevelEnsurer interface {
Ensure([]*flowcontrolv1beta3.PriorityLevelConfiguration) error
}
// PriorityLevelRemover is the interface that wraps the
// RemoveAutoUpdateEnabledObjects method.
//
// RemoveAutoUpdateEnabledObjects removes a set of bootstrap
// PriorityLevelConfiguration objects specified via their names.
// The function removes an object only if automatic update
// of the spec is enabled for it.
type PriorityLevelRemover interface {
RemoveAutoUpdateEnabledObjects([]string) error
}
// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects.
func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client, client: client,
lister: lister, lister: lister},
} boots: boots,
return &plEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper,
} }
} }
// NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that type priorityLevelConfigurationClient struct {
// can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects.
func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
}
return &plEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
}
// NewPriorityLevelRemover returns a PriorityLevelRemover instance that
// can be used to remove a set of PriorityLevelConfiguration configuration objects.
func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelRemover {
return &plEnsurer{
wrapper: &priorityLevelConfigurationWrapper{
client: client,
lister: lister,
},
}
}
// GetPriorityLevelRemoveCandidates returns a list of PriorityLevelConfiguration
// names that are candidates for removal from the cluster.
// bootstrap: a set of hard coded PriorityLevelConfiguration configuration
// objects kube-apiserver maintains in-memory.
func GetPriorityLevelRemoveCandidates(lister flowcontrollisters.PriorityLevelConfigurationLister, bootstrap []*flowcontrolv1beta3.PriorityLevelConfiguration) ([]string, error) {
plList, err := lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err)
}
bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(plList))
for i := range plList {
currentObjects[i] = plList[i]
}
return getDanglingBootstrapObjectNames(bootstrapNames, currentObjects), nil
}
type plEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
}
func (e *plEnsurer) Ensure(priorityLevels []*flowcontrolv1beta3.PriorityLevelConfiguration) error {
for _, priorityLevel := range priorityLevels {
// This code gets called by different goroutines. To avoid race conditions when
// https://github.com/kubernetes/kubernetes/blob/330b5a2b8dbd681811cb8235947557c99dd8e593/staging/src/k8s.io/apimachinery/pkg/runtime/helper.go#L221-L243
// temporarily modifies the TypeMeta, we have to make a copy here.
if err := ensureConfiguration(e.wrapper, e.strategy, priorityLevel.DeepCopy()); err != nil {
return err
}
}
return nil
}
func (e *plEnsurer) RemoveAutoUpdateEnabledObjects(priorityLevels []string) error {
for _, priorityLevel := range priorityLevels {
if err := removeAutoUpdateEnabledConfiguration(e.wrapper, priorityLevel); err != nil {
return err
}
}
return nil
}
// priorityLevelConfigurationWrapper abstracts all PriorityLevelConfiguration specific logic,
// with this we can manage all boiler plate code in one place.
type priorityLevelConfigurationWrapper struct {
client flowcontrolclient.PriorityLevelConfigurationInterface client flowcontrolclient.PriorityLevelConfigurationInterface
lister flowcontrollisters.PriorityLevelConfigurationLister lister flowcontrollisters.PriorityLevelConfigurationLister
} }
func (fs *priorityLevelConfigurationWrapper) TypeName() string { type bootstrapPriorityLevelConfigurations struct {
priorityLevelConfigurationClient
// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.PriorityLevelConfiguration
}
func (*priorityLevelConfigurationClient) typeName() string {
return "PriorityLevelConfiguration" return "PriorityLevelConfiguration"
} }
func (fs *priorityLevelConfigurationWrapper) Create(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapPriorityLevelConfigurations) len() int {
plObject, ok := object.(*flowcontrolv1beta3.PriorityLevelConfiguration) return len(boots.boots)
if !ok {
return nil, errObjectNotPriorityLevel
} }
return fs.client.Create(context.TODO(), plObject, metav1.CreateOptions{FieldManager: fieldManager}) func (boots *bootstrapPriorityLevelConfigurations) get(i int) bootstrapObject {
return &bootstrapPriorityLevelConfiguration{
priorityLevelConfigurationClient: &boots.priorityLevelConfigurationClient,
bootstrap: boots.boots[i],
}
} }
func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runtime.Object, error) { func (boots *bootstrapPriorityLevelConfigurations) getExistingObjects() ([]deletable, error) {
fsObject, ok := object.(*flowcontrolv1beta3.PriorityLevelConfiguration) objs, err := boots.lister.List(labels.Everything())
if !ok { if err != nil {
return nil, errObjectNotPriorityLevel return nil, fmt.Errorf("failed to list PriorityLevelConfiguration objects - %w", err)
}
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletablePriorityLevelConfiguration{
PriorityLevelConfiguration: obj,
client: boots.client,
}
}
return dels, nil
} }
return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager}) type bootstrapPriorityLevelConfiguration struct {
*priorityLevelConfigurationClient
// points to immutable content
bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration
} }
func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) { func (boot *bootstrapPriorityLevelConfiguration) getName() string {
return fs.lister.Get(name) return boot.bootstrap.Name
} }
func (fs *priorityLevelConfigurationWrapper) Delete(name string) error { func (boot *bootstrapPriorityLevelConfiguration) create(ctx context.Context) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{}) // Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
} }
func (fs *priorityLevelConfigurationWrapper) CopySpec(bootstrap, current runtime.Object) error { func (boot *bootstrapPriorityLevelConfiguration) getCurrent() (wantAndHave, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.PriorityLevelConfiguration) current, err := boot.lister.Get(boot.bootstrap.Name)
if !ok { if err != nil {
return errObjectNotPriorityLevel return nil, err
} }
currentFS, ok := current.(*flowcontrolv1beta3.PriorityLevelConfiguration) return &wantAndHavePriorityLevelConfiguration{
if !ok { client: boot.client,
return errObjectNotPriorityLevel want: boot.bootstrap,
have: current,
}, nil
} }
specCopy := bootstrapFS.Spec.DeepCopy() type wantAndHavePriorityLevelConfiguration struct {
currentFS.Spec = *specCopy client flowcontrolclient.PriorityLevelConfigurationInterface
return nil want *flowcontrolv1beta3.PriorityLevelConfiguration
have *flowcontrolv1beta3.PriorityLevelConfiguration
} }
func (fs *priorityLevelConfigurationWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) { func (wah *wantAndHavePriorityLevelConfiguration) getWant() configurationObject {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta3.PriorityLevelConfiguration) return wah.want
if !ok {
return false, errObjectNotPriorityLevel
}
currentFS, ok := current.(*flowcontrolv1beta3.PriorityLevelConfiguration)
if !ok {
return false, errObjectNotPriorityLevel
} }
return priorityLevelSpecChanged(bootstrapFS, currentFS), nil func (wah *wantAndHavePriorityLevelConfiguration) getHave() configurationObject {
return wah.have
}
func (wah *wantAndHavePriorityLevelConfiguration) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatablePriorityLevelConfiguration{
PriorityLevelConfiguration: copy,
client: wah.client,
}
}
func (wah *wantAndHavePriorityLevelConfiguration) specsDiffer() bool {
return priorityLevelSpecChanged(wah.want, wah.have)
} }
func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool { func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool {
@ -210,3 +147,23 @@ func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevel
flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel) flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec) return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec)
} }
type updatablePriorityLevelConfiguration struct {
*flowcontrolv1beta3.PriorityLevelConfiguration
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (u *updatablePriorityLevelConfiguration) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.PriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}
type deletablePriorityLevelConfiguration struct {
*flowcontrolv1beta3.PriorityLevelConfiguration
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (dbl *deletablePriorityLevelConfiguration) delete(ctx context.Context /* resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
}

View File

@ -26,20 +26,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3" flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3" flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
) )
func TestEnsurePriorityLevel(t *testing.T) { func TestEnsurePriorityLevel(t *testing.T) {
tests := []struct { tests := []struct {
name string name string
strategy func(flowcontrolclient.PriorityLevelConfigurationInterface, flowcontrollisters.PriorityLevelConfigurationLister) PriorityLevelEnsurer strategy func() EnsureStrategy
current *flowcontrolv1beta3.PriorityLevelConfiguration current *flowcontrolv1beta3.PriorityLevelConfiguration
bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration
expected *flowcontrolv1beta3.PriorityLevelConfiguration expected *flowcontrolv1beta3.PriorityLevelConfiguration
@ -47,21 +45,21 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for suggested configurations // for suggested configurations
{ {
name: "suggested priority level configuration does not exist - the object should always be re-created", name: "suggested priority level configuration does not exist - the object should always be re-created",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil, current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithLimited(10).Object(),
}, },
{ {
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
}, },
{ {
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedPriorityLevelEnsurerEnsurer, strategy: NewSuggestedEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
@ -70,21 +68,21 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for mandatory configurations // for mandatory configurations
{ {
name: "mandatory priority level configuration does not exist - new object should be created", name: "mandatory priority level configuration does not exist - new object should be created",
strategy: NewMandatoryPriorityLevelEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
current: nil, current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(), expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
}, },
{ {
name: "mandatory priority level configuration exists, annotation is missing - annotation is added", name: "mandatory priority level configuration exists, annotation is missing - annotation is added",
strategy: NewMandatoryPriorityLevelEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithLimited(20).Object(), current: newPLConfiguration("pl1").WithLimited(20).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
}, },
{ {
name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated", name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryPriorityLevelEnsurer, strategy: NewMandatoryEnsureStrategy,
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
@ -100,9 +98,10 @@ func TestEnsurePriorityLevel(t *testing.T) {
indexer.Add(test.current) indexer.Add(test.current)
} }
ensurer := test.strategy(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap})
strategy := test.strategy()
err := ensurer.Ensure([]*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap}) err := EnsureConfigurations(context.Background(), boots, strategy)
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -207,12 +206,19 @@ func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{}) strategy := NewSuggestedEnsureStrategy()
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) wah := &wantAndHavePriorityLevelConfiguration{
want: test.bootstrap,
have: test.current,
}
updatableGot, updateGot, err := strategy.ShouldUpdate(wah)
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error, but got: %v", err)
} }
var newObjectGot *flowcontrolv1beta3.PriorityLevelConfiguration
if updatableGot != nil {
newObjectGot = updatableGot.(*updatablePriorityLevelConfiguration).PriorityLevelConfiguration
}
if test.newObjectExpected == nil { if test.newObjectExpected == nil {
if newObjectGot != nil { if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot) t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
@ -308,24 +314,42 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
removeExpected bool removeExpected bool
}{ }{
{ {
name: "priority level configuration does not exist", name: "no priority level configuration objects exist",
bootstrapName: "pl1", bootstrapName: "pl1",
current: nil, current: nil,
}, },
{ {
name: "priority level configuration exists, auto update is enabled", name: "priority level configuration not wanted, auto update is enabled",
bootstrapName: "pl1", bootstrapName: "pl0",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
removeExpected: true, removeExpected: true,
}, },
{ {
name: "priority level configuration exists, auto update is disabled", name: "priority level configuration not wanted, auto update is disabled",
bootstrapName: "pl0",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(),
removeExpected: false,
},
{
name: "priority level configuration not wanted, the auto-update annotation is malformed",
bootstrapName: "pl0",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false,
},
{
name: "priority level configuration wanted, auto update is enabled",
bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
removeExpected: false,
},
{
name: "priority level configuration wanted, auto update is disabled",
bootstrapName: "pl1", bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(),
removeExpected: false, removeExpected: false,
}, },
{ {
name: "priority level configuration exists, the auto-update annotation is malformed", name: "priority level configuration wanted, the auto-update annotation is malformed",
bootstrapName: "pl1", bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false, removeExpected: false,
@ -341,8 +365,9 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
indexer.Add(test.current) indexer.Add(test.current)
} }
remover := NewPriorityLevelRemover(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)) boot := newPLConfiguration(test.bootstrapName).Object()
err := remover.RemoveAutoUpdateEnabledObjects([]string{test.bootstrapName}) boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{boot})
err := RemoveUnwantedObjects(context.Background(), boots)
if err != nil { if err != nil {
t.Fatalf("Expected no error, but got: %v", err) t.Fatalf("Expected no error, but got: %v", err)
} }
@ -350,7 +375,7 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
if test.current == nil { if test.current == nil {
return return
} }
_, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{}) _, err = client.Get(context.TODO(), test.current.Name, metav1.GetOptions{})
switch { switch {
case test.removeExpected: case test.removeExpected:
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
@ -365,85 +390,6 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
} }
} }
func TestGetPriorityLevelRemoveCandidate(t *testing.T) {
tests := []struct {
name string
current []*flowcontrolv1beta3.PriorityLevelConfiguration
bootstrap []*flowcontrolv1beta3.PriorityLevelConfiguration
expected []string
}{
{
name: "no object has been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{},
},
{
name: "bootstrap is empty, all current objects with the annotation should be candidates",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").Object(),
},
expected: []string{"pl1", "pl2"},
},
{
name: "object(s) have been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{"pl2", "pl3"},
},
{
name: "object(s) without the annotation key are ignored",
bootstrap: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta3.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").Object(),
newPLConfiguration("pl3").Object(),
},
expected: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
for i := range test.current {
indexer.Add(test.current[i])
}
lister := flowcontrollisters.NewPriorityLevelConfigurationLister(indexer)
removeListGot, err := GetPriorityLevelRemoveCandidates(lister, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot, cmpopts.SortSlices(func(a string, b string) bool {
return a < b
})) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})
}
}
type plBuilder struct { type plBuilder struct {
object *flowcontrolv1beta3.PriorityLevelConfiguration object *flowcontrolv1beta3.PriorityLevelConfiguration
} }

View File

@ -17,26 +17,25 @@ limitations under the License.
package ensurer package ensurer
import ( import (
"errors" "context"
"fmt" "fmt"
"strconv" "strconv"
"github.com/google/go-cmp/cmp"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3" flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"github.com/google/go-cmp/cmp"
) )
const ( const (
fieldManager = "api-priority-and-fairness-config-producer-v1" fieldManager = "api-priority-and-fairness-config-producer-v1"
) )
// ensureStrategy provides a strategy for ensuring apf bootstrap configurationWrapper. // EnsureStrategy provides a maintenance strategy for APF configuration objects.
// We have two types of configurationWrapper objects: // We have two types of strategy, corresponding to the two types of config objetcs:
// //
// - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F // - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F
// system itself won't crash; we have to be sure there's 'catch-all' place for // system itself won't crash; we have to be sure there's 'catch-all' place for
@ -45,58 +44,67 @@ const (
// //
// - suggested: additional configurationWrapper objects for initial behavior. // - suggested: additional configurationWrapper objects for initial behavior.
// the cluster operators have an option to edit or delete these configurationWrapper objects. // the cluster operators have an option to edit or delete these configurationWrapper objects.
type ensureStrategy interface { type EnsureStrategy interface {
// Name of the strategy, for now we have two: 'mandatory' and 'suggested'. // Name of the strategy, for now we have two: 'mandatory' and 'suggested'.
// This comes handy in logging. // This comes handy in logging.
Name() string Name() string
// ShouldUpdate accepts the current and the bootstrap configuration and determines // ShouldUpdate accepts a pair of the current and the bootstrap configuration and determines
// whether an update is necessary. // whether an update is necessary.
// current is the existing in-cluster configuration object. // current is the existing in-cluster configuration object.
// bootstrap is the configuration the kube-apiserver maintains in-memory. // bootstrap is the configuration the kube-apiserver maintains in-memory.
// //
// revised: the new object represents the new configuration to be stored in-cluster.
// ok: true if auto update is required, otherwise false // ok: true if auto update is required, otherwise false
// object: the new object represents the new configuration to be stored in-cluster.
// err: err is set when the function runs into an error and can not // err: err is set when the function runs into an error and can not
// determine if auto update is needed. // determine if auto update is needed.
ShouldUpdate(current, bootstrap configurationObject) (object runtime.Object, ok bool, err error) ShouldUpdate(wantAndHave) (revised updatable, ok bool, err error)
} }
// this internal interface provides abstraction for dealing with the `Spec` // BootstrapObjects is a generic interface to a list of bootstrap objects bound up with the relevant operations on them.
// of both 'FlowSchema' and 'PriorityLevelConfiguration' objects. // The binding makes it unnecessary to have any type casts.
// Since the ensure logic for both types is common, we use a few internal interfaces // A bootstrap object is a mandatory or suggested config object,
// to abstract out the differences of these two types. // with the spec that the code is built to provide.
type specCopier interface { type BootstrapObjects interface {
// HasSpecChanged returns true if the spec of both the bootstrap and typeName() string // the Kind of the objects
// the current configuration object is same, otherwise false. len() int // number of objects
HasSpecChanged(bootstrap, current runtime.Object) (bool, error) get(int) bootstrapObject // extract one object, origin 0
getExistingObjects() ([]deletable, error) // returns all the APF config objects that exist at the moment
// CopySpec makes a deep copy the spec of the bootstrap object
// and copies it to that of the current object.
// CopySpec assumes that the current object is safe to mutate, so it
// rests with the caller to make a deep copy of the current.
CopySpec(bootstrap, current runtime.Object) error
} }
// this internal interface provides abstraction for CRUD operation // deletable is an existing config object and it supports the delete operation
// related to both 'FlowSchema' and 'PriorityLevelConfiguration' objects. type deletable interface {
// Since the ensure logic for both types is common, we use a few internal interfaces configurationObject
// to abstract out the differences of these two types. delete(context.Context) error // delete the object. TODO: make conditional on ResouceVersion
type configurationClient interface {
Create(object runtime.Object) (runtime.Object, error)
Update(object runtime.Object) (runtime.Object, error)
Get(name string) (configurationObject, error)
Delete(name string) error
} }
type configurationWrapper interface { // bootstrapObject is a single bootstrap object.
// TypeName returns the type of the configuration that this interface deals with. // Its spec is what the code provides.
// We use it to log the type name of the configuration object being ensured. type bootstrapObject interface {
// It is either 'PriorityLevelConfiguration' or 'FlowSchema' typeName() string // the Kind of the object
TypeName() string getName() string // the object's name
create(context.Context) error // request the server to create the object
getCurrent() (wantAndHave, error) // pair up with the object as it currently exists
}
configurationClient // wantAndHave is a pair of versions of an APF config object.
specCopier // The "want" has the spec that the code provides.
// The "have" is what came from the server.
type wantAndHave interface {
getWant() configurationObject
getHave() configurationObject
specsDiffer() bool
// copyHave returns a copy of the "have" version,
// optionally with spec replaced by the spec from "want".
copyHave(specFromWant bool) updatable
}
// updatable is an APF config object that can be written back to the apiserver
type updatable interface {
configurationObject
update(context.Context) error
} }
// A convenient wrapper interface that is used by the ensure logic. // A convenient wrapper interface that is used by the ensure logic.
@ -105,17 +113,17 @@ type configurationObject interface {
runtime.Object runtime.Object
} }
func newSuggestedEnsureStrategy(copier specCopier) ensureStrategy { // NewSuggestedEnsureStrategy returns an EnsureStrategy for suggested config objects
func NewSuggestedEnsureStrategy() EnsureStrategy {
return &strategy{ return &strategy{
copier: copier,
alwaysAutoUpdateSpec: false, alwaysAutoUpdateSpec: false,
name: "suggested", name: "suggested",
} }
} }
func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy { // NewMandatoryEnsureStrategy returns an EnsureStrategy for mandatory config objects
func NewMandatoryEnsureStrategy() EnsureStrategy {
return &strategy{ return &strategy{
copier: copier,
alwaysAutoUpdateSpec: true, alwaysAutoUpdateSpec: true,
name: "mandatory", name: "mandatory",
} }
@ -123,7 +131,6 @@ func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy {
// auto-update strategy for the configuration objects // auto-update strategy for the configuration objects
type strategy struct { type strategy struct {
copier specCopier
alwaysAutoUpdateSpec bool alwaysAutoUpdateSpec bool
name string name string
} }
@ -132,8 +139,10 @@ func (s *strategy) Name() string {
return s.name return s.name
} }
func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) { func (s *strategy) ShouldUpdate(wah wantAndHave) (updatable, bool, error) {
if current == nil || bootstrap == nil { current := wah.getHave()
if current == nil {
return nil, false, nil return nil, false, nil
} }
@ -143,39 +152,23 @@ func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime
} }
updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec) updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec)
var specChanged bool specChanged := autoUpdateSpec && wah.specsDiffer()
if autoUpdateSpec {
changed, err := s.copier.HasSpecChanged(bootstrap, current)
if err != nil {
return nil, false, fmt.Errorf("failed to compare spec - %w", err)
}
specChanged = changed
}
if !(updateAnnotation || specChanged) { if !(updateAnnotation || specChanged) {
// the annotation key is up to date and the spec has not changed, no update is necessary // the annotation key is up to date and the spec has not changed, no update is necessary
return nil, false, nil return nil, false, nil
} }
// if we are here, either we need to update the annotation key or the spec. revised := wah.copyHave(specChanged)
copy, ok := current.DeepCopyObject().(configurationObject)
if !ok {
// we should never be here
return nil, false, errors.New("incompatible object type")
}
if updateAnnotation { if updateAnnotation {
setAutoUpdateAnnotation(copy, autoUpdateSpec) setAutoUpdateAnnotation(revised, autoUpdateSpec)
}
if specChanged {
s.copier.CopySpec(bootstrap, copy)
} }
return copy, true, nil return revised, true, nil
} }
// shouldUpdateSpec inspects the auto-update annotation key and generation field to determine // shouldUpdateSpec inspects the auto-update annotation key and generation field to determine
// whether the configurationWrapper object should be auto-updated. // whether the config object should be auto-updated.
func shouldUpdateSpec(accessor metav1.Object) bool { func shouldUpdateSpec(accessor metav1.Object) bool {
value, _ := accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] value, _ := accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]
if autoUpdate, err := strconv.ParseBool(value); err == nil { if autoUpdate, err := strconv.ParseBool(value); err == nil {
@ -215,130 +208,130 @@ func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) {
accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate) accessor.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate)
} }
// ensureConfiguration ensures the boostrap configurationWrapper on the cluster based on the specified strategy. // EnsureConfigurations applies the given maintenance strategy to the given objects.
func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy, bootstrap configurationObject) error { // At the first error, if any, it stops and returns that error.
name := bootstrap.GetName() func EnsureConfigurations(ctx context.Context, boots BootstrapObjects, strategy EnsureStrategy) error {
len := boots.len()
for i := 0; i < len; i++ {
bo := boots.get(i)
err := EnsureConfiguration(ctx, bo, strategy)
if err != nil {
return err
}
}
return nil
}
// EnsureConfiguration applies the given maintenance strategy to the given object.
func EnsureConfiguration(ctx context.Context, bootstrap bootstrapObject, strategy EnsureStrategy) error {
name := bootstrap.getName()
configurationType := strategy.Name() configurationType := strategy.Name()
var current configurationObject var wah wantAndHave
var err error var err error
for { for {
current, err = wrapper.Get(bootstrap.GetName()) wah, err = bootstrap.getCurrent()
if err == nil { if err == nil {
break break
} }
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
// we always re-create a missing configuration object // we always re-create a missing configuration object
if _, err = wrapper.Create(bootstrap); err == nil { if err = bootstrap.create(ctx); err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.typeName()), "type", configurationType, "name", name)
return nil return nil
} }
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.typeName()), "type", configurationType, "name", name)
} }
klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.typeName()), "type", configurationType, "name", name)
newObject, update, err := strategy.ShouldUpdate(current, bootstrap) newObject, update, err := strategy.ShouldUpdate(wah)
if err != nil { if err != nil {
return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
if !update { if !update {
if klogV := klog.V(5); klogV.Enabled() { if klogV := klog.V(5); klogV.Enabled() {
klogV.InfoS("No update required", "wrapper", wrapper.TypeName(), "type", configurationType, "name", name, klogV.InfoS("No update required", "wrapper", bootstrap.typeName(), "type", configurationType, "name", name,
"diff", cmp.Diff(current, bootstrap)) "diff", cmp.Diff(wah.getHave(), wah.getWant()))
} }
return nil return nil
} }
if _, err = wrapper.Update(newObject); err == nil { if err = newObject.update(ctx); err == nil {
klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject)) klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.typeName(), configurationType, name, cmp.Diff(wah.getHave(), wah.getWant()))
return nil return nil
} }
if apierrors.IsConflict(err) { if apierrors.IsConflict(err) {
klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.typeName()), "type", configurationType, "name", name)
return nil return nil
} }
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
} }
// removeAutoUpdateEnabledConfiguration makes an attempt to remove the given // RemoveUnwantedObjects attempts to delete the configuration objects
// configuration object if automatic update of the spec is enabled for this object. // that exist, are annotated `apf.kubernetes.io/autoupdate-spec=true`, and do not
func removeAutoUpdateEnabledConfiguration(wrapper configurationWrapper, name string) error { // have a name in the given set. A refusal due to concurrent update is logged
current, err := wrapper.Get(name) // and not considered an error; the object will be reconsidered later.
func RemoveUnwantedObjects(ctx context.Context, boots BootstrapObjects) error {
current, err := boots.getExistingObjects()
if err != nil { if err != nil {
if apierrors.IsNotFound(err) { return err
return nil
} }
wantedNames := namesOfBootstrapObjects(boots)
return fmt.Errorf("failed to retrieve the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err) for _, object := range current {
name := object.GetName()
if wantedNames.Has(name) {
continue
} }
var value string
value := current.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey] var ok, autoUpdate bool
autoUpdate, err := strconv.ParseBool(value) var err error
if err != nil { if value, ok = object.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]; !ok {
klog.ErrorS(err, fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name)
// This may need manual intervention, in case the annotation value is malformed,
// so don't return an error, that might trigger futile retry loop.
return nil
}
if !autoUpdate {
klog.V(5).InfoS(fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name)
return nil
}
if err := wrapper.Delete(name); err != nil {
if apierrors.IsNotFound(err) {
klog.V(5).InfoS(fmt.Sprintf("Something concurrently deleted the %s", wrapper.TypeName()), "name", name)
return nil
}
return fmt.Errorf("failed to delete the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err)
}
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the %s", wrapper.TypeName()), "name", name)
return nil
}
// getDanglingBootstrapObjectNames returns a list of names of bootstrap
// configuration objects that are potentially candidates for deletion from
// the cluster, given a set of bootstrap and current configuration.
// - bootstrap: a set of hard coded configuration kube-apiserver maintains in-memory.
// - current: a set of configuration objects that exist on the cluster
//
// Any object present in current is added to the list if both a and b are true:
//
// a. the object in current is missing from the bootstrap configuration
// b. the object has the designated auto-update annotation key
//
// This function shares the common logic for both FlowSchema and
// PriorityLevelConfiguration type and hence it accepts metav1.Object only.
func getDanglingBootstrapObjectNames(bootstrap sets.String, current []metav1.Object) []string {
if len(current) == 0 {
return nil
}
candidates := make([]string, 0)
for i := range current {
object := current[i]
if _, ok := object.GetAnnotations()[flowcontrolv1beta3.AutoUpdateAnnotationKey]; !ok {
// the configuration object does not have the annotation key, // the configuration object does not have the annotation key,
// it's probably a user defined configuration object, // it's probably a user defined configuration object,
// so we can skip it. // so we can skip it.
klog.V(5).InfoS("Skipping deletion of APF object with no "+flowcontrolv1beta3.AutoUpdateAnnotationKey+" annotation", "name", name)
continue continue
} }
autoUpdate, err = strconv.ParseBool(value)
if err != nil {
// Log this because it is not an expected situation.
klog.V(4).InfoS("Skipping deletion of APF object with malformed "+flowcontrolv1beta3.AutoUpdateAnnotationKey+" annotation", "name", name, "annotationValue", value, "parseError", err)
continue
}
if !autoUpdate {
klog.V(5).InfoS("Skipping deletion of APF object with "+flowcontrolv1beta3.AutoUpdateAnnotationKey+"=false annotation", "name", name)
continue
}
// TODO: expectedResourceVersion := object.GetResourceVersion()
err = object.delete(ctx /* TODO: expectedResourceVersion */)
if err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", boots.typeName()), "name", name)
continue
}
if apierrors.IsNotFound(err) {
klog.V(5).InfoS("Unwanted APF object was concurrently deleted", "name", name)
} else {
return fmt.Errorf("failed to delete unwatned APF object %q - %w", name, err)
}
}
return nil
}
if _, ok := bootstrap[object.GetName()]; !ok { func namesOfBootstrapObjects(bos BootstrapObjects) sets.String {
candidates = append(candidates, object.GetName()) names := sets.NewString()
len := bos.len()
for i := 0; i < len; i++ {
bo := bos.get(i)
names.Insert(bo.getName())
} }
} return names
return candidates
} }

View File

@ -141,6 +141,7 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
return fmt.Errorf("failed to initialize clientset for APF - %w", err) return fmt.Errorf("failed to initialize clientset for APF - %w", err)
} }
err = func() error {
// get a derived context that gets cancelled after 5m or // get a derived context that gets cancelled after 5m or
// when the StopCh gets closed, whichever happens first. // when the StopCh gets closed, whichever happens first.
ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute) ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
@ -154,23 +155,29 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
ctx, ctx,
time.Second, time.Second,
func(context.Context) (bool, error) { func(context.Context) (bool, error) {
if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
return false, nil return false, nil
} }
return true, nil return true, nil
}) })
if err != nil { if err != nil {
return fmt.Errorf("unable to initialize APF bootstrap configuration") return fmt.Errorf("unable to initialize APF bootstrap configuration: %w", err)
}
return nil
}()
if err != nil {
return err
} }
// we have successfully initialized the bootstrap configuration, now we // we have successfully initialized the bootstrap configuration, now we
// spin up a goroutine which reconciles the bootstrap configuration periodically. // spin up a goroutine which reconciles the bootstrap configuration periodically.
go func() { go func() {
ctx := wait.ContextForChannel(hookContext.StopCh)
wait.PollImmediateUntil( wait.PollImmediateUntil(
time.Minute, time.Minute,
func() (bool, error) { func() (bool, error) {
if err := ensure(clientset, bce.fsLister, bce.plcLister); err != nil { if err := ensure(ctx, clientset, bce.fsLister, bce.plcLister); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later") klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
} }
// always auto update both suggested and mandatory configuration // always auto update both suggested and mandatory configuration
@ -182,79 +189,64 @@ func (bce *bootstrapConfigurationEnsurer) ensureAPFBootstrapConfiguration(hookCo
return nil return nil
} }
func ensure(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func ensure(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
if err := ensureSuggestedConfiguration(clientset, fsLister, plcLister); err != nil {
if err := ensureSuggestedConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
// We should not attempt creation of mandatory objects if ensuring the suggested // We should not attempt creation of mandatory objects if ensuring the suggested
// configuration resulted in an error. // configuration resulted in an error.
// This only happens when the stop channel is closed. // This only happens when the stop channel is closed.
return fmt.Errorf("failed ensuring suggested settings - %w", err) return fmt.Errorf("failed ensuring suggested settings - %w", err)
} }
if err := ensureMandatoryConfiguration(clientset, fsLister, plcLister); err != nil { if err := ensureMandatoryConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
return fmt.Errorf("failed ensuring mandatory settings - %w", err) return fmt.Errorf("failed ensuring mandatory settings - %w", err)
} }
if err := removeDanglingBootstrapConfiguration(clientset, fsLister, plcLister); err != nil { if err := removeDanglingBootstrapConfiguration(ctx, clientset, fsLister, plcLister); err != nil {
return fmt.Errorf("failed to delete removed settings - %w", err) return fmt.Errorf("failed to delete removed settings - %w", err)
} }
return nil return nil
} }
func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func ensureSuggestedConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), plcLister) plcSuggesteds := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
if err := plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations); err != nil { if err := ensurer.EnsureConfigurations(ctx, plcSuggesteds, ensurer.NewSuggestedEnsureStrategy()); err != nil {
return err return err
} }
fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) fsSuggesteds := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.SuggestedFlowSchemas)
return fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas) return ensurer.EnsureConfigurations(ctx, fsSuggesteds, ensurer.NewSuggestedEnsureStrategy())
} }
func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func ensureMandatoryConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas(), fsLister) plcMandatories := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations)
if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil { if err := ensurer.EnsureConfigurations(ctx, plcMandatories, ensurer.NewMandatoryEnsureStrategy()); err != nil {
return err return err
} }
plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations(), plcLister) fsMandatories := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.MandatoryFlowSchemas)
return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations) return ensurer.EnsureConfigurations(ctx, fsMandatories, ensurer.NewMandatoryEnsureStrategy())
} }
func removeDanglingBootstrapConfiguration(clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error { func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
if err := removeDanglingBootstrapFlowSchema(clientset.FlowSchemas(), fsLister); err != nil { if err := removeDanglingBootstrapFlowSchema(ctx, clientset, fsLister); err != nil {
return err return err
} }
return removeDanglingBootstrapPriorityLevel(clientset.PriorityLevelConfigurations(), plcLister) return removeDanglingBootstrapPriorityLevel(ctx, clientset, plcLister)
} }
func removeDanglingBootstrapFlowSchema(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister) error { func removeDanglingBootstrapFlowSchema(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...) bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
candidates, err := ensurer.GetFlowSchemaRemoveCandidates(lister, bootstrap) fsBoots := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, bootstrap)
if err != nil { return ensurer.RemoveUnwantedObjects(ctx, fsBoots)
return err
}
if len(candidates) == 0 {
return nil
} }
fsRemover := ensurer.NewFlowSchemaRemover(client, lister) func removeDanglingBootstrapPriorityLevel(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
return fsRemover.RemoveAutoUpdateEnabledObjects(candidates)
}
func removeDanglingBootstrapPriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...) bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
candidates, err := ensurer.GetPriorityLevelRemoveCandidates(lister, bootstrap) plcBoots := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, bootstrap)
if err != nil { return ensurer.RemoveUnwantedObjects(ctx, plcBoots)
return err
}
if len(candidates) == 0 {
return nil
}
plRemover := ensurer.NewPriorityLevelRemover(client, lister)
return plRemover.RemoveAutoUpdateEnabledObjects(candidates)
} }
// contextFromChannelAndMaxWaitDuration returns a Context that is bound to the // contextFromChannelAndMaxWaitDuration returns a Context that is bound to the