Usse generics to simplify pkg/registry/flowcontrol/ensurer

Signed-off-by: Mike Spreitzer <mspreitz@us.ibm.com>
This commit is contained in:
Mike Spreitzer 2023-06-05 23:27:46 -04:00
parent 1ff1a26426
commit 92a35f5bca
6 changed files with 199 additions and 439 deletions

View File

@ -17,153 +17,25 @@ limitations under the License.
package ensurer
import (
"context"
"fmt"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
)
// WrapBootstrapFlowSchemas creates a generic representation of the given bootstrap objects bound with their operations
// Every object in `boots` is immutable.
func WrapBootstrapFlowSchemas(client flowcontrolclient.FlowSchemaInterface, lister flowcontrollisters.FlowSchemaLister, boots []*flowcontrolv1beta3.FlowSchema) BootstrapObjects {
return &bootstrapFlowSchemas{
flowSchemaClient: flowSchemaClient{
client: client,
lister: lister},
boots: boots,
}
func NewFlowSchemaOps(client flowcontrolclient.FlowSchemaInterface, cache flowcontrollisters.FlowSchemaLister) ObjectOps[*flowcontrolv1beta3.FlowSchema] {
return NewObjectOps[*flowcontrolv1beta3.FlowSchema](client, cache, (*flowcontrolv1beta3.FlowSchema).DeepCopy, flowSchemaReplaceSpec, flowSchemaSpecEqual)
}
type flowSchemaClient struct {
client flowcontrolclient.FlowSchemaInterface
lister flowcontrollisters.FlowSchemaLister
func flowSchemaReplaceSpec(into, from *flowcontrolv1beta3.FlowSchema) *flowcontrolv1beta3.FlowSchema {
copy := into.DeepCopy()
copy.Spec = *from.Spec.DeepCopy()
return copy
}
type bootstrapFlowSchemas struct {
flowSchemaClient
// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.FlowSchema
}
func (*flowSchemaClient) typeName() string {
return "FlowSchema"
}
func (boots *bootstrapFlowSchemas) len() int {
return len(boots.boots)
}
func (boots *bootstrapFlowSchemas) get(i int) bootstrapObject {
return &bootstrapFlowSchema{
flowSchemaClient: &boots.flowSchemaClient,
bootstrap: boots.boots[i],
}
}
func (boots *bootstrapFlowSchemas) getExistingObjects() ([]deletable, error) {
objs, err := boots.lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list FlowSchema objects - %w", err)
}
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletableFlowSchema{
FlowSchema: obj,
client: boots.client,
}
}
return dels, nil
}
type bootstrapFlowSchema struct {
*flowSchemaClient
// points to immutable contnet
bootstrap *flowcontrolv1beta3.FlowSchema
}
func (boot *bootstrapFlowSchema) getName() string {
return boot.bootstrap.Name
}
func (boot *bootstrapFlowSchema) create(ctx context.Context) error {
// Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
}
func (boot *bootstrapFlowSchema) getCurrent() (wantAndHave, error) {
current, err := boot.lister.Get(boot.bootstrap.Name)
if err != nil {
return nil, err
}
return &wantAndHaveFlowSchema{
client: boot.client,
want: boot.bootstrap,
have: current,
}, nil
}
type wantAndHaveFlowSchema struct {
client flowcontrolclient.FlowSchemaInterface
want *flowcontrolv1beta3.FlowSchema
have *flowcontrolv1beta3.FlowSchema
}
func (wah *wantAndHaveFlowSchema) getWant() configurationObject {
return wah.want
}
func (wah *wantAndHaveFlowSchema) getHave() configurationObject {
return wah.have
}
func (wah *wantAndHaveFlowSchema) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatableFlowSchema{
FlowSchema: copy,
client: wah.client,
}
}
func (wah *wantAndHaveFlowSchema) specsDiffer() bool {
return flowSchemaSpecChanged(wah.want, wah.have)
}
func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta3.FlowSchema) bool {
copiedExpectedFlowSchema := expected.DeepCopy()
flowcontrolapisv1beta3.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec)
}
type updatableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}
func (u *updatableFlowSchema) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.FlowSchema, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}
type deletableFlowSchema struct {
*flowcontrolv1beta3.FlowSchema
client flowcontrolclient.FlowSchemaInterface
}
func (dbl *deletableFlowSchema) delete(ctx context.Context /* TODO: resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
func flowSchemaSpecEqual(expected, actual *flowcontrolv1beta3.FlowSchema) bool {
copiedExpectedSpec := expected.Spec.DeepCopy()
flowcontrolapisv1beta3.SetDefaults_FlowSchemaSpec(copiedExpectedSpec)
return equality.Semantic.DeepEqual(copiedExpectedSpec, &actual.Spec)
}

View File

@ -27,7 +27,7 @@ import (
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
"k8s.io/client-go/tools/cache"
toolscache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
@ -42,7 +42,7 @@ func init() {
func TestEnsureFlowSchema(t *testing.T) {
tests := []struct {
name string
strategy func() EnsureStrategy
strategy func() EnsureStrategy[*flowcontrolv1beta3.FlowSchema]
current *flowcontrolv1beta3.FlowSchema
bootstrap *flowcontrolv1beta3.FlowSchema
expected *flowcontrolv1beta3.FlowSchema
@ -50,21 +50,21 @@ func TestEnsureFlowSchema(t *testing.T) {
// for suggested configurations
{
name: "suggested flow schema does not exist - the object should always be re-created",
strategy: NewSuggestedEnsureStrategy,
strategy: NewSuggestedEnsureStrategy[*flowcontrolv1beta3.FlowSchema],
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil,
expected: newFlowSchema("fs1", "pl1", 100).Object(),
},
{
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedEnsureStrategy,
strategy: NewSuggestedEnsureStrategy[*flowcontrolv1beta3.FlowSchema],
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedEnsureStrategy,
strategy: NewSuggestedEnsureStrategy[*flowcontrolv1beta3.FlowSchema],
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
@ -73,21 +73,21 @@ func TestEnsureFlowSchema(t *testing.T) {
// for mandatory configurations
{
name: "mandatory flow schema does not exist - new object should be created",
strategy: NewMandatoryEnsureStrategy,
strategy: NewMandatoryEnsureStrategy[*flowcontrolv1beta3.FlowSchema],
bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
current: nil,
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory flow schema exists, annotation is missing - annotation should be added",
strategy: NewMandatoryEnsureStrategy,
strategy: NewMandatoryEnsureStrategy[*flowcontrolv1beta3.FlowSchema],
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 100).Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryEnsureStrategy,
strategy: NewMandatoryEnsureStrategy[*flowcontrolv1beta3.FlowSchema],
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
@ -97,15 +97,16 @@ func TestEnsureFlowSchema(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta3().FlowSchemas()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
indexer := toolscache.NewIndexer(toolscache.MetaNamespaceKeyFunc, toolscache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{test.bootstrap})
ops := NewFlowSchemaOps(client, flowcontrollisters.NewFlowSchemaLister(indexer))
boots := []*flowcontrolv1beta3.FlowSchema{test.bootstrap}
strategy := test.strategy()
err := EnsureConfigurations(context.Background(), boots, strategy)
err := EnsureConfigurations(context.Background(), ops, boots, strategy)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
@ -208,24 +209,17 @@ func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) {
},
}
ops := NewFlowSchemaOps(nil, nil)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
wah := &wantAndHaveFlowSchema{
want: test.bootstrap,
have: test.current,
}
strategy := NewSuggestedEnsureStrategy()
updatableGot, updateGot, err := strategy.ShouldUpdate(wah)
strategy := NewSuggestedEnsureStrategy[*flowcontrolv1beta3.FlowSchema]()
updatableGot, updateGot, err := strategy.ReviseIfNeeded(ops, test.current, test.bootstrap)
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
var newObjectGot *flowcontrolv1beta3.FlowSchema
if updatableGot != nil {
newObjectGot = updatableGot.(*updatableFlowSchema).FlowSchema
}
if test.newObjectExpected == nil {
if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
if updatableGot != nil {
t.Errorf("Expected a nil object, but got: %#v", updatableGot)
}
if updateGot {
t.Errorf("Expected update=%t but got: %t", false, updateGot)
@ -236,8 +230,8 @@ func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) {
if !updateGot {
t.Errorf("Expected update=%t but got: %t", true, updateGot)
}
if !reflect.DeepEqual(test.newObjectExpected, newObjectGot) {
t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, newObjectGot))
if !reflect.DeepEqual(test.newObjectExpected, updatableGot) {
t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, updatableGot))
}
})
}
@ -284,7 +278,7 @@ func TestFlowSchemaSpecChanged(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
w := flowSchemaSpecChanged(testCase.expected, testCase.actual)
w := !flowSchemaSpecEqual(testCase.expected, testCase.actual)
assert.Equal(t, testCase.specChanged, w)
})
}
@ -343,14 +337,15 @@ func TestRemoveFlowSchema(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta3().FlowSchemas()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
indexer := toolscache.NewIndexer(toolscache.MetaNamespaceKeyFunc, toolscache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
bootFS := newFlowSchema(test.bootstrapName, "pl", 100).Object()
boots := WrapBootstrapFlowSchemas(client, flowcontrollisters.NewFlowSchemaLister(indexer), []*flowcontrolv1beta3.FlowSchema{bootFS})
err := RemoveUnwantedObjects(context.Background(), boots)
ops := NewFlowSchemaOps(client, flowcontrollisters.NewFlowSchemaLister(indexer))
boots := []*flowcontrolv1beta3.FlowSchema{bootFS}
err := RemoveUnwantedObjects(context.Background(), ops, boots)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)

View File

@ -17,153 +17,26 @@ limitations under the License.
package ensurer
import (
"context"
"fmt"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
)
// WrapBootstrapPriorityLevelConfigurations creates a generic representation of the given bootstrap objects bound with their operations.
// Every object in `boots` is immutable.
func WrapBootstrapPriorityLevelConfigurations(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister, boots []*flowcontrolv1beta3.PriorityLevelConfiguration) BootstrapObjects {
return &bootstrapPriorityLevelConfigurations{
priorityLevelConfigurationClient: priorityLevelConfigurationClient{
client: client,
lister: lister},
boots: boots,
}
func NewPriorityLevelConfigurationOps(client flowcontrolclient.PriorityLevelConfigurationInterface, lister flowcontrollisters.PriorityLevelConfigurationLister) ObjectOps[*flowcontrolv1beta3.PriorityLevelConfiguration] {
return NewObjectOps[*flowcontrolv1beta3.PriorityLevelConfiguration](client, lister, (*flowcontrolv1beta3.PriorityLevelConfiguration).DeepCopy,
plcReplaceSpec, plcSpecEqual)
}
type priorityLevelConfigurationClient struct {
client flowcontrolclient.PriorityLevelConfigurationInterface
lister flowcontrollisters.PriorityLevelConfigurationLister
func plcReplaceSpec(into, from *flowcontrolv1beta3.PriorityLevelConfiguration) *flowcontrolv1beta3.PriorityLevelConfiguration {
copy := into.DeepCopy()
copy.Spec = *from.Spec.DeepCopy()
return copy
}
type bootstrapPriorityLevelConfigurations struct {
priorityLevelConfigurationClient
// Every member is a pointer to immutable content
boots []*flowcontrolv1beta3.PriorityLevelConfiguration
}
func (*priorityLevelConfigurationClient) typeName() string {
return "PriorityLevelConfiguration"
}
func (boots *bootstrapPriorityLevelConfigurations) len() int {
return len(boots.boots)
}
func (boots *bootstrapPriorityLevelConfigurations) get(i int) bootstrapObject {
return &bootstrapPriorityLevelConfiguration{
priorityLevelConfigurationClient: &boots.priorityLevelConfigurationClient,
bootstrap: boots.boots[i],
}
}
func (boots *bootstrapPriorityLevelConfigurations) getExistingObjects() ([]deletable, error) {
objs, err := boots.lister.List(labels.Everything())
if err != nil {
return nil, fmt.Errorf("failed to list PriorityLevelConfiguration objects - %w", err)
}
dels := make([]deletable, len(objs))
for i, obj := range objs {
dels[i] = &deletablePriorityLevelConfiguration{
PriorityLevelConfiguration: obj,
client: boots.client,
}
}
return dels, nil
}
type bootstrapPriorityLevelConfiguration struct {
*priorityLevelConfigurationClient
// points to immutable content
bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration
}
func (boot *bootstrapPriorityLevelConfiguration) getName() string {
return boot.bootstrap.Name
}
func (boot *bootstrapPriorityLevelConfiguration) create(ctx context.Context) error {
// Copy the object here because the Encoder in the client code may modify the object; see
// https://github.com/kubernetes/kubernetes/pull/117107
// and WithVersionEncoder in apimachinery/pkg/runtime/helper.go.
_, err := boot.client.Create(ctx, boot.bootstrap.DeepCopy(), metav1.CreateOptions{FieldManager: fieldManager})
return err
}
func (boot *bootstrapPriorityLevelConfiguration) getCurrent() (wantAndHave, error) {
current, err := boot.lister.Get(boot.bootstrap.Name)
if err != nil {
return nil, err
}
return &wantAndHavePriorityLevelConfiguration{
client: boot.client,
want: boot.bootstrap,
have: current,
}, nil
}
type wantAndHavePriorityLevelConfiguration struct {
client flowcontrolclient.PriorityLevelConfigurationInterface
want *flowcontrolv1beta3.PriorityLevelConfiguration
have *flowcontrolv1beta3.PriorityLevelConfiguration
}
func (wah *wantAndHavePriorityLevelConfiguration) getWant() configurationObject {
return wah.want
}
func (wah *wantAndHavePriorityLevelConfiguration) getHave() configurationObject {
return wah.have
}
func (wah *wantAndHavePriorityLevelConfiguration) copyHave(specFromWant bool) updatable {
copy := wah.have.DeepCopy()
if specFromWant {
copy.Spec = *wah.want.Spec.DeepCopy()
}
return &updatablePriorityLevelConfiguration{
PriorityLevelConfiguration: copy,
client: wah.client,
}
}
func (wah *wantAndHavePriorityLevelConfiguration) specsDiffer() bool {
return priorityLevelSpecChanged(wah.want, wah.have)
}
func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool {
copiedExpectedPriorityLevel := expected.DeepCopy()
flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec)
}
type updatablePriorityLevelConfiguration struct {
*flowcontrolv1beta3.PriorityLevelConfiguration
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (u *updatablePriorityLevelConfiguration) update(ctx context.Context) error {
_, err := u.client.Update(ctx, u.PriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: fieldManager})
return err
}
type deletablePriorityLevelConfiguration struct {
*flowcontrolv1beta3.PriorityLevelConfiguration
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (dbl *deletablePriorityLevelConfiguration) delete(ctx context.Context /* resourceVersion string */) error {
// return dbl.client.Delete(context.TODO(), dbl.Name, metav1.DeleteOptions{Preconditions: &metav1.Preconditions{ResourceVersion: &resourceVersion}})
return dbl.client.Delete(ctx, dbl.Name, metav1.DeleteOptions{})
func plcSpecEqual(expected, actual *flowcontrolv1beta3.PriorityLevelConfiguration) bool {
copiedExpected := expected.DeepCopy()
flowcontrolapisv1beta3.SetObjectDefaults_PriorityLevelConfiguration(copiedExpected)
return equality.Semantic.DeepEqual(copiedExpected.Spec, actual.Spec)
}

View File

@ -27,7 +27,7 @@ import (
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrollisters "k8s.io/client-go/listers/flowcontrol/v1beta3"
"k8s.io/client-go/tools/cache"
toolscache "k8s.io/client-go/tools/cache"
flowcontrolapisv1beta3 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta3"
"k8s.io/utils/pointer"
@ -37,7 +37,7 @@ import (
func TestEnsurePriorityLevel(t *testing.T) {
tests := []struct {
name string
strategy func() EnsureStrategy
strategy func() EnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration]
current *flowcontrolv1beta3.PriorityLevelConfiguration
bootstrap *flowcontrolv1beta3.PriorityLevelConfiguration
expected *flowcontrolv1beta3.PriorityLevelConfiguration
@ -45,21 +45,21 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for suggested configurations
{
name: "suggested priority level configuration does not exist - the object should always be re-created",
strategy: NewSuggestedEnsureStrategy,
strategy: NewSuggestedEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration],
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).Object(),
},
{
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: NewSuggestedEnsureStrategy,
strategy: NewSuggestedEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration],
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
{
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: NewSuggestedEnsureStrategy,
strategy: NewSuggestedEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration],
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
@ -68,21 +68,21 @@ func TestEnsurePriorityLevel(t *testing.T) {
// for mandatory configurations
{
name: "mandatory priority level configuration does not exist - new object should be created",
strategy: NewMandatoryEnsureStrategy,
strategy: NewMandatoryEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration],
bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory priority level configuration exists, annotation is missing - annotation is added",
strategy: NewMandatoryEnsureStrategy,
strategy: NewMandatoryEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration],
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithLimited(20).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
{
name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated",
strategy: NewMandatoryEnsureStrategy,
strategy: NewMandatoryEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration],
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
@ -92,16 +92,17 @@ func TestEnsurePriorityLevel(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta3().PriorityLevelConfigurations()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
indexer := toolscache.NewIndexer(toolscache.MetaNamespaceKeyFunc, toolscache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap})
ops := NewPriorityLevelConfigurationOps(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer))
boots := []*flowcontrolv1beta3.PriorityLevelConfiguration{test.bootstrap}
strategy := test.strategy()
err := EnsureConfigurations(context.Background(), boots, strategy)
err := EnsureConfigurations(context.Background(), ops, boots, strategy)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
@ -204,24 +205,17 @@ func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) {
},
}
ops := NewPriorityLevelConfigurationOps(nil, nil)
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
strategy := NewSuggestedEnsureStrategy()
wah := &wantAndHavePriorityLevelConfiguration{
want: test.bootstrap,
have: test.current,
}
updatableGot, updateGot, err := strategy.ShouldUpdate(wah)
strategy := NewSuggestedEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration]()
updatableGot, updateGot, err := strategy.ReviseIfNeeded(ops, test.current, test.bootstrap)
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
var newObjectGot *flowcontrolv1beta3.PriorityLevelConfiguration
if updatableGot != nil {
newObjectGot = updatableGot.(*updatablePriorityLevelConfiguration).PriorityLevelConfiguration
}
if test.newObjectExpected == nil {
if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
if updatableGot != nil {
t.Errorf("Expected a nil object, but got: %#v", updatableGot)
}
if updateGot {
t.Errorf("Expected update=%t but got: %t", false, updateGot)
@ -232,8 +226,8 @@ func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) {
if !updateGot {
t.Errorf("Expected update=%t but got: %t", true, updateGot)
}
if !reflect.DeepEqual(test.newObjectExpected, newObjectGot) {
t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, newObjectGot))
if !reflect.DeepEqual(test.newObjectExpected, updatableGot) {
t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, updatableGot))
}
})
}
@ -297,7 +291,7 @@ func TestPriorityLevelSpecChanged(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
w := priorityLevelSpecChanged(testCase.expected, testCase.actual)
w := !plcSpecEqual(testCase.expected, testCase.actual)
if testCase.specChanged != w {
t.Errorf("Expected priorityLevelSpecChanged to return %t, but got: %t - diff: %s", testCase.specChanged, w,
cmp.Diff(testCase.expected, testCase.actual))
@ -359,15 +353,16 @@ func TestRemovePriorityLevelConfiguration(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta3().PriorityLevelConfigurations()
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
indexer := toolscache.NewIndexer(toolscache.MetaNamespaceKeyFunc, toolscache.Indexers{})
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
indexer.Add(test.current)
}
boot := newPLConfiguration(test.bootstrapName).Object()
boots := WrapBootstrapPriorityLevelConfigurations(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer), []*flowcontrolv1beta3.PriorityLevelConfiguration{boot})
err := RemoveUnwantedObjects(context.Background(), boots)
boots := []*flowcontrolv1beta3.PriorityLevelConfiguration{boot}
ops := NewPriorityLevelConfigurationOps(client, flowcontrollisters.NewPriorityLevelConfigurationLister(indexer))
err := RemoveUnwantedObjects(context.Background(), ops, boots)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}

View File

@ -25,6 +25,7 @@ import (
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
@ -44,13 +45,13 @@ const (
//
// - suggested: additional configurationWrapper objects for initial behavior.
// the cluster operators have an option to edit or delete these configurationWrapper objects.
type EnsureStrategy interface {
type EnsureStrategy[ObjectType configurationObjectType] interface {
// Name of the strategy, for now we have two: 'mandatory' and 'suggested'.
// This comes handy in logging.
Name() string
// ShouldUpdate accepts a pair of the current and the bootstrap configuration and determines
// whether an update is necessary.
// ReviseIfNeeded accepts a pair of the current and the bootstrap configuration, determines
// whether an update is necessary, and returns a (revised if appropriate) copy of the object.
// current is the existing in-cluster configuration object.
// bootstrap is the configuration the kube-apiserver maintains in-memory.
//
@ -58,65 +59,87 @@ type EnsureStrategy interface {
// ok: true if auto update is required, otherwise false
// err: err is set when the function runs into an error and can not
// determine if auto update is needed.
ShouldUpdate(wantAndHave) (revised updatable, ok bool, err error)
ReviseIfNeeded(objectOps objectLocalOps[ObjectType], current, bootstrap ObjectType) (revised ObjectType, ok bool, err error)
}
// BootstrapObjects is a generic interface to a list of bootstrap objects bound up with the relevant operations on them.
// The binding makes it unnecessary to have any type casts.
// A bootstrap object is a mandatory or suggested config object,
// with the spec that the code is built to provide.
type BootstrapObjects interface {
typeName() string // the Kind of the objects
len() int // number of objects
get(int) bootstrapObject // extract one object, origin 0
getExistingObjects() ([]deletable, error) // returns all the APF config objects that exist at the moment
// objectLocalOps is the needed operations on an individual configurationObject
type objectLocalOps[ObjectType configurationObject] interface {
DeepCopy(ObjectType) ObjectType
// replaceSpec returns a deep copy of `into` except that the spec is a deep copy of `from`
ReplaceSpec(into, from ObjectType) ObjectType
// specEqual says whether applying defaulting to `expected` makes its spec equal that of `actual`
SpecEqual(expected, actual ObjectType) bool
}
// deletable is an existing config object and it supports the delete operation
type deletable interface {
configurationObject
delete(context.Context) error // delete the object. TODO: make conditional on ResouceVersion
// ObjectOps is the needed operations, both as a receiver from a server and server-independent, on configurationObjects
type ObjectOps[ObjectType configurationObject] interface {
client[ObjectType]
cache[ObjectType]
objectLocalOps[ObjectType]
}
// bootstrapObject is a single bootstrap object.
// Its spec is what the code provides.
type bootstrapObject interface {
typeName() string // the Kind of the object
getName() string // the object's name
create(context.Context) error // request the server to create the object
getCurrent() (wantAndHave, error) // pair up with the object as it currently exists
// Client is the needed fragment of the typed generated client stubs for the given object type
type client[ObjectType configurationObject] interface {
Create(ctx context.Context, obj ObjectType, opts metav1.CreateOptions) (ObjectType, error)
Update(ctx context.Context, obj ObjectType, opts metav1.UpdateOptions) (ObjectType, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
}
// wantAndHave is a pair of versions of an APF config object.
// The "want" has the spec that the code provides.
// The "have" is what came from the server.
type wantAndHave interface {
getWant() configurationObject
getHave() configurationObject
specsDiffer() bool
// copyHave returns a copy of the "have" version,
// optionally with spec replaced by the spec from "want".
copyHave(specFromWant bool) updatable
// cache is the needed fragment of the typed generated access ("lister") to an informer's local cache
type cache[ObjectType configurationObject] interface {
List(labels.Selector) ([]ObjectType, error)
Get(name string) (ObjectType, error)
}
// updatable is an APF config object that can be written back to the apiserver
type updatable interface {
configurationObject
update(context.Context) error
}
// A convenient wrapper interface that is used by the ensure logic.
// configurationObject is the relevant interfaces that each API object type implements
type configurationObject interface {
metav1.Object
runtime.Object
}
// configurationObjectType adds the type constraint `comparable` and is thus
// only usable as a type constraint.
type configurationObjectType interface {
comparable
configurationObject
}
type objectOps[ObjectType configurationObjectType] struct {
client[ObjectType]
cache[ObjectType]
deepCopy func(ObjectType) ObjectType
replaceSpec func(ObjectType, ObjectType) ObjectType
specEqual func(expected, actual ObjectType) bool
}
func NewObjectOps[ObjectType configurationObjectType](client client[ObjectType], cache cache[ObjectType],
deepCopy func(ObjectType) ObjectType,
replaceSpec func(ObjectType, ObjectType) ObjectType,
specEqual func(expected, actual ObjectType) bool,
) ObjectOps[ObjectType] {
return objectOps[ObjectType]{client: client,
cache: cache,
deepCopy: deepCopy,
replaceSpec: replaceSpec,
specEqual: specEqual}
}
func (oo objectOps[ObjectType]) DeepCopy(obj ObjectType) ObjectType { return oo.deepCopy(obj) }
func (oo objectOps[ObjectType]) ReplaceSpec(into, from ObjectType) ObjectType {
return oo.replaceSpec(into, from)
}
func (oo objectOps[ObjectType]) SpecEqual(expected, actual ObjectType) bool {
return oo.specEqual(expected, actual)
}
// NewSuggestedEnsureStrategy returns an EnsureStrategy for suggested config objects
func NewSuggestedEnsureStrategy() EnsureStrategy {
return &strategy{
alwaysAutoUpdateSpecFn: func(_ wantAndHave) bool {
func NewSuggestedEnsureStrategy[ObjectType configurationObjectType]() EnsureStrategy[ObjectType] {
return &strategy[ObjectType]{
alwaysAutoUpdateSpecFn: func(want, have ObjectType) bool {
return false
},
name: "suggested",
@ -124,9 +147,9 @@ func NewSuggestedEnsureStrategy() EnsureStrategy {
}
// NewMandatoryEnsureStrategy returns an EnsureStrategy for mandatory config objects
func NewMandatoryEnsureStrategy() EnsureStrategy {
return &strategy{
alwaysAutoUpdateSpecFn: func(_ wantAndHave) bool {
func NewMandatoryEnsureStrategy[ObjectType configurationObjectType]() EnsureStrategy[ObjectType] {
return &strategy[ObjectType]{
alwaysAutoUpdateSpecFn: func(want, have ObjectType) bool {
return true
},
name: "mandatory",
@ -134,36 +157,41 @@ func NewMandatoryEnsureStrategy() EnsureStrategy {
}
// auto-update strategy for the configuration objects
type strategy struct {
alwaysAutoUpdateSpecFn func(wantAndHave) bool
type strategy[ObjectType configurationObjectType] struct {
alwaysAutoUpdateSpecFn func(want, have ObjectType) bool
name string
}
func (s *strategy) Name() string {
func (s *strategy[ObjectType]) Name() string {
return s.name
}
func (s *strategy) ShouldUpdate(wah wantAndHave) (updatable, bool, error) {
current := wah.getHave()
if current == nil {
return nil, false, nil
func (s *strategy[ObjectType]) ReviseIfNeeded(objectOps objectLocalOps[ObjectType], current, bootstrap ObjectType) (ObjectType, bool, error) {
var zero ObjectType
if current == zero {
return zero, false, nil
}
autoUpdateSpec := s.alwaysAutoUpdateSpecFn(wah)
autoUpdateSpec := s.alwaysAutoUpdateSpecFn(bootstrap, current)
if !autoUpdateSpec {
autoUpdateSpec = shouldUpdateSpec(current)
}
updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec)
specChanged := autoUpdateSpec && wah.specsDiffer()
// specChanged := autoUpdateSpec && wah.specsDiffer()
specChanged := autoUpdateSpec && !objectOps.SpecEqual(bootstrap, current)
if !(updateAnnotation || specChanged) {
// the annotation key is up to date and the spec has not changed, no update is necessary
return nil, false, nil
return zero, false, nil
}
revised := wah.copyHave(specChanged)
var revised ObjectType
if specChanged {
revised = objectOps.ReplaceSpec(current, bootstrap)
} else {
revised = objectOps.DeepCopy(current)
}
if updateAnnotation {
setAutoUpdateAnnotation(revised, autoUpdateSpec)
}
@ -214,11 +242,9 @@ func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) {
// EnsureConfigurations applies the given maintenance strategy to the given objects.
// At the first error, if any, it stops and returns that error.
func EnsureConfigurations(ctx context.Context, boots BootstrapObjects, strategy EnsureStrategy) error {
len := boots.len()
for i := 0; i < len; i++ {
bo := boots.get(i)
err := EnsureConfiguration(ctx, bo, strategy)
func EnsureConfigurations[ObjectType configurationObjectType](ctx context.Context, ops ObjectOps[ObjectType], boots []ObjectType, strategy EnsureStrategy[ObjectType]) error {
for _, bo := range boots {
err := EnsureConfiguration(ctx, ops, bo, strategy)
if err != nil {
return err
}
@ -227,65 +253,65 @@ func EnsureConfigurations(ctx context.Context, boots BootstrapObjects, strategy
}
// EnsureConfiguration applies the given maintenance strategy to the given object.
func EnsureConfiguration(ctx context.Context, bootstrap bootstrapObject, strategy EnsureStrategy) error {
name := bootstrap.getName()
func EnsureConfiguration[ObjectType configurationObjectType](ctx context.Context, ops ObjectOps[ObjectType], bootstrap ObjectType, strategy EnsureStrategy[ObjectType]) error {
name := bootstrap.GetName()
configurationType := strategy.Name()
var wah wantAndHave
var current ObjectType
var err error
for {
wah, err = bootstrap.getCurrent()
current, err = ops.Get(name)
if err == nil {
break
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
}
// we always re-create a missing configuration object
if err = bootstrap.create(ctx); err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.typeName()), "type", configurationType, "name", name)
if _, err = ops.Create(ctx, bootstrap, metav1.CreateOptions{FieldManager: fieldManager}); err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
return nil
}
if !apierrors.IsAlreadyExists(err) {
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
}
klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.typeName()), "type", configurationType, "name", name)
klog.V(5).InfoS(fmt.Sprintf("Something created the %s concurrently", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
}
klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.typeName()), "type", configurationType, "name", name)
newObject, update, err := strategy.ShouldUpdate(wah)
klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
newObject, update, err := strategy.ReviseIfNeeded(ops, current, bootstrap)
if err != nil {
return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
}
if !update {
if klogV := klog.V(5); klogV.Enabled() {
klogV.InfoS("No update required", "wrapper", bootstrap.typeName(), "type", configurationType, "name", name,
"diff", cmp.Diff(wah.getHave(), wah.getWant()))
klogV.InfoS("No update required", "wrapper", bootstrap.GetObjectKind().GroupVersionKind().Kind, "type", configurationType, "name", name,
"diff", cmp.Diff(current, bootstrap))
}
return nil
}
if err = newObject.update(ctx); err == nil {
klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.typeName(), configurationType, name, cmp.Diff(wah.getHave(), wah.getWant()))
if _, err = ops.Update(ctx, newObject, metav1.UpdateOptions{FieldManager: fieldManager}); err == nil {
klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, cmp.Diff(current, bootstrap))
return nil
}
if apierrors.IsConflict(err) {
klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.typeName()), "type", configurationType, "name", name)
klog.V(2).InfoS(fmt.Sprintf("Something updated the %s concurrently, I will check its spec later", bootstrap.GetObjectKind().GroupVersionKind().Kind), "type", configurationType, "name", name)
return nil
}
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.typeName(), configurationType, name, err)
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", bootstrap.GetObjectKind().GroupVersionKind().Kind, configurationType, name, err)
}
// RemoveUnwantedObjects attempts to delete the configuration objects
// that exist, are annotated `apf.kubernetes.io/autoupdate-spec=true`, and do not
// have a name in the given set. A refusal due to concurrent update is logged
// and not considered an error; the object will be reconsidered later.
func RemoveUnwantedObjects(ctx context.Context, boots BootstrapObjects) error {
current, err := boots.getExistingObjects()
func RemoveUnwantedObjects[ObjectType configurationObjectType](ctx context.Context, objectOps ObjectOps[ObjectType], boots []ObjectType) error {
current, err := objectOps.List(labels.Everything())
if err != nil {
return err
}
@ -316,9 +342,9 @@ func RemoveUnwantedObjects(ctx context.Context, boots BootstrapObjects) error {
continue
}
// TODO: expectedResourceVersion := object.GetResourceVersion()
err = object.delete(ctx /* TODO: expectedResourceVersion */)
err = objectOps.Delete(ctx, object.GetName(), metav1.DeleteOptions{ /* TODO: expectedResourceVersion */ })
if err == nil {
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", boots.typeName()), "name", name)
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the unwanted %s", object.GetObjectKind().GroupVersionKind().Kind), "name", name)
continue
}
if apierrors.IsNotFound(err) {
@ -330,12 +356,10 @@ func RemoveUnwantedObjects(ctx context.Context, boots BootstrapObjects) error {
return nil
}
func namesOfBootstrapObjects(bos BootstrapObjects) sets.String {
func namesOfBootstrapObjects[ObjectType configurationObjectType](bos []ObjectType) sets.String {
names := sets.NewString()
len := bos.len()
for i := 0; i < len; i++ {
bo := bos.get(i)
names.Insert(bo.getName())
for _, bo := range bos {
names.Insert(bo.GetName())
}
return names
}

View File

@ -21,6 +21,7 @@ import (
"fmt"
"time"
flowcontrolv1beta3 "k8s.io/api/flowcontrol/v1beta3"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
@ -210,23 +211,23 @@ func ensure(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3I
}
func ensureSuggestedConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
plcSuggesteds := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
if err := ensurer.EnsureConfigurations(ctx, plcSuggesteds, ensurer.NewSuggestedEnsureStrategy()); err != nil {
plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
if err := ensurer.EnsureConfigurations(ctx, plcOps, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations, ensurer.NewSuggestedEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration]()); err != nil {
return err
}
fsSuggesteds := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.SuggestedFlowSchemas)
return ensurer.EnsureConfigurations(ctx, fsSuggesteds, ensurer.NewSuggestedEnsureStrategy())
fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
return ensurer.EnsureConfigurations(ctx, fsOps, flowcontrolbootstrap.SuggestedFlowSchemas, ensurer.NewSuggestedEnsureStrategy[*flowcontrolv1beta3.FlowSchema]())
}
func ensureMandatoryConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
plcMandatories := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations)
if err := ensurer.EnsureConfigurations(ctx, plcMandatories, ensurer.NewMandatoryEnsureStrategy()); err != nil {
plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
if err := ensurer.EnsureConfigurations(ctx, plcOps, flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, ensurer.NewMandatoryEnsureStrategy[*flowcontrolv1beta3.PriorityLevelConfiguration]()); err != nil {
return err
}
fsMandatories := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, flowcontrolbootstrap.MandatoryFlowSchemas)
return ensurer.EnsureConfigurations(ctx, fsMandatories, ensurer.NewMandatoryEnsureStrategy())
fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
return ensurer.EnsureConfigurations(ctx, fsOps, flowcontrolbootstrap.MandatoryFlowSchemas, ensurer.NewMandatoryEnsureStrategy[*flowcontrolv1beta3.FlowSchema]())
}
func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
@ -239,14 +240,14 @@ func removeDanglingBootstrapConfiguration(ctx context.Context, clientset flowcon
func removeDanglingBootstrapFlowSchema(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, fsLister flowcontrollisters.FlowSchemaLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
fsBoots := ensurer.WrapBootstrapFlowSchemas(clientset.FlowSchemas(), fsLister, bootstrap)
return ensurer.RemoveUnwantedObjects(ctx, fsBoots)
fsOps := ensurer.NewFlowSchemaOps(clientset.FlowSchemas(), fsLister)
return ensurer.RemoveUnwantedObjects(ctx, fsOps, bootstrap)
}
func removeDanglingBootstrapPriorityLevel(ctx context.Context, clientset flowcontrolclient.FlowcontrolV1beta3Interface, plcLister flowcontrollisters.PriorityLevelConfigurationLister) error {
bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
plcBoots := ensurer.WrapBootstrapPriorityLevelConfigurations(clientset.PriorityLevelConfigurations(), plcLister, bootstrap)
return ensurer.RemoveUnwantedObjects(ctx, plcBoots)
plcOps := ensurer.NewPriorityLevelConfigurationOps(clientset.PriorityLevelConfigurations(), plcLister)
return ensurer.RemoveUnwantedObjects(ctx, plcOps, bootstrap)
}
// contextFromChannelAndMaxWaitDuration returns a Context that is bound to the