apf: always create missing bootstrap configuration object(s)

This commit is contained in:
Abu Kashem 2021-05-17 11:52:48 -04:00
parent 382a33986b
commit f9ee64007e
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
7 changed files with 21 additions and 113 deletions

View File

@ -46,13 +46,12 @@ type FlowSchemaRemover interface {
// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that // NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of suggested FlowSchema configuration objects. // can be used to ensure a set of suggested FlowSchema configuration objects.
// shouldCreate indicates whether a missing 'suggested' FlowSchema object should be recreated. func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, shouldCreate bool) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{ wrapper := &flowSchemaWrapper{
client: client, client: client,
} }
return &fsEnsurer{ return &fsEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper, shouldCreate), strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper, wrapper: wrapper,
} }
} }

View File

@ -43,27 +43,18 @@ func TestEnsureFlowSchema(t *testing.T) {
}{ }{
// for suggested configurations // for suggested configurations
{ {
name: "suggested flow schema does not exist and we should ensure - the object should be created", name: "suggested flow schema does not exist - the object should always be re-created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, true) return NewSuggestedFlowSchemaEnsurer(client)
}, },
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil, current: nil,
expected: newFlowSchema("fs1", "pl1", 100).Object(), expected: newFlowSchema("fs1", "pl1", 100).Object(),
}, },
{
name: "suggested flow schema does not exist and we should not ensure - the object should not be created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, false)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil,
expected: nil,
},
{ {
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated", name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, true) return NewSuggestedFlowSchemaEnsurer(client)
}, },
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
@ -72,7 +63,7 @@ func TestEnsureFlowSchema(t *testing.T) {
{ {
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated", name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer { strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, true) return NewSuggestedFlowSchemaEnsurer(client)
}, },
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(), bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(), current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
@ -223,7 +214,7 @@ func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{}, false) strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{})
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap)
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error, but got: %v", err)

View File

@ -46,13 +46,12 @@ type PriorityLevelRemover interface {
// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that // NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects. // can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects.
// shouldCreate indicates whether a missing 'suggested' PriorityLevelConfiguration object should be recreated. func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, shouldCreate bool) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{ wrapper := &priorityLevelConfigurationWrapper{
client: client, client: client,
} }
return &plEnsurer{ return &plEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper, shouldCreate), strategy: newSuggestedEnsureStrategy(wrapper),
wrapper: wrapper, wrapper: wrapper,
} }
} }

View File

@ -43,27 +43,18 @@ func TestEnsurePriorityLevel(t *testing.T) {
}{ }{
// for suggested configurations // for suggested configurations
{ {
name: "suggested priority level configuration does not exist and we should ensure - new object should be created", name: "suggested priority level configuration does not exist - the object should always be re-created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, true) return NewSuggestedPriorityLevelEnsurerEnsurer(client)
}, },
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil, current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).Object(), expected: newPLConfiguration("pl1").WithLimited(10).Object(),
}, },
{
name: "suggested priority level configuration does not exist and we should not ensure - new object should not be created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, false)
},
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil,
expected: nil,
},
{ {
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated", name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, true) return NewSuggestedPriorityLevelEnsurerEnsurer(client)
}, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), }, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(), expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
@ -71,7 +62,7 @@ func TestEnsurePriorityLevel(t *testing.T) {
{ {
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated", name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer { strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, true) return NewSuggestedPriorityLevelEnsurerEnsurer(client)
}, },
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(), bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(), current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
@ -223,7 +214,7 @@ func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) {
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{}, false) strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{})
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap) newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap)
if err != nil { if err != nil {
t.Errorf("Expected no error, but got: %v", err) t.Errorf("Expected no error, but got: %v", err)

View File

@ -49,9 +49,6 @@ type ensureStrategy interface {
// This comes handy in logging. // This comes handy in logging.
Name() string Name() string
// ShouldCreate returns true if a missing configuration object should be recreated.
ShouldCreate() bool
// ShouldUpdate accepts the current and the bootstrap configuration and determines // ShouldUpdate accepts the current and the bootstrap configuration and determines
// whether an update is necessary. // whether an update is necessary.
// current is the existing in-cluster configuration object. // current is the existing in-cluster configuration object.
@ -107,10 +104,9 @@ type configurationObject interface {
runtime.Object runtime.Object
} }
func newSuggestedEnsureStrategy(copier specCopier, shouldCreate bool) ensureStrategy { func newSuggestedEnsureStrategy(copier specCopier) ensureStrategy {
return &strategy{ return &strategy{
copier: copier, copier: copier,
shouldCreate: shouldCreate,
alwaysAutoUpdateSpec: false, alwaysAutoUpdateSpec: false,
name: "suggested", name: "suggested",
} }
@ -119,7 +115,6 @@ func newSuggestedEnsureStrategy(copier specCopier, shouldCreate bool) ensureStra
func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy { func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy {
return &strategy{ return &strategy{
copier: copier, copier: copier,
shouldCreate: true,
alwaysAutoUpdateSpec: true, alwaysAutoUpdateSpec: true,
name: "mandatory", name: "mandatory",
} }
@ -128,7 +123,6 @@ func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy {
// auto-update strategy for the configuration objects // auto-update strategy for the configuration objects
type strategy struct { type strategy struct {
copier specCopier copier specCopier
shouldCreate bool
alwaysAutoUpdateSpec bool alwaysAutoUpdateSpec bool
name string name string
} }
@ -137,10 +131,6 @@ func (s *strategy) Name() string {
return s.name return s.name
} }
func (s *strategy) ShouldCreate() bool {
return s.shouldCreate
}
func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) { func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) {
if current == nil || bootstrap == nil { if current == nil || bootstrap == nil {
return nil, false, nil return nil, false, nil
@ -235,16 +225,12 @@ func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy,
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
} }
if strategy.ShouldCreate() { // we always re-create a missing configuration object
if _, err := wrapper.Create(bootstrap); err != nil { if _, err := wrapper.Create(bootstrap); err != nil {
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err) return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
}
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name)
return nil
} }
klog.V(5).InfoS(fmt.Sprintf("Skipping creation of %s", wrapper.TypeName()), "type", configurationType, "name", name) klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name)
return nil return nil
} }

View File

@ -21,8 +21,6 @@ import (
"fmt" "fmt"
"time" "time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" flowcontrolbootstrap "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/generic"
@ -153,8 +151,6 @@ func ensure(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
// We should not attempt creation of mandatory objects if ensuring the suggested // We should not attempt creation of mandatory objects if ensuring the suggested
// configuration resulted in an error. // configuration resulted in an error.
// This only happens when the stop channel is closed. // This only happens when the stop channel is closed.
// We rely on the presence of the "exempt" priority level configuration object in the cluster
// to indicate whether we should ensure suggested configuration.
return fmt.Errorf("failed ensuring suggested settings - %w", err) return fmt.Errorf("failed ensuring suggested settings - %w", err)
} }
@ -169,30 +165,13 @@ func ensure(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
return nil return nil
} }
// shouldCreateSuggested checks if the exempt priority level exists and returns
// whether the suggested flow schemas and priority levels should be ensured.
func shouldCreateSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface) (bool, error) {
if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), flowcontrol.PriorityLevelConfigurationNameExempt, metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return true, nil
}
return false, err
}
return false, nil
}
func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error { func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
shouldCreateSuggested, err := shouldCreateSuggested(clientset) fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas())
if err != nil {
return fmt.Errorf("failed to determine whether suggested configuration should be created - error: %w", err)
}
fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), shouldCreateSuggested)
if err := fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas); err != nil { if err := fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas); err != nil {
return err return err
} }
plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), shouldCreateSuggested) plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations())
return plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations) return plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
} }

View File

@ -20,45 +20,8 @@ import (
"context" "context"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
) )
func TestShouldEnsurePredefinedSettings(t *testing.T) {
testCases := []struct {
name string
existingPriorityLevel *flowcontrolv1beta1.PriorityLevelConfiguration
expected bool
}{
{
name: "should ensure if exempt priority-level is absent",
existingPriorityLevel: nil,
expected: true,
},
{
name: "should not ensure if exempt priority-level is present",
existingPriorityLevel: bootstrap.MandatoryPriorityLevelConfigurationExempt,
expected: false,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
c := fake.NewSimpleClientset()
if testCase.existingPriorityLevel != nil {
c.FlowcontrolV1beta1().PriorityLevelConfigurations().Create(context.TODO(), testCase.existingPriorityLevel, metav1.CreateOptions{})
}
should, err := shouldCreateSuggested(c.FlowcontrolV1beta1())
assert.NoError(t, err)
assert.Equal(t, testCase.expected, should)
})
}
}
func TestContextFromChannelAndMaxWaitDurationWithChannelClosed(t *testing.T) { func TestContextFromChannelAndMaxWaitDurationWithChannelClosed(t *testing.T) {
stopCh := make(chan struct{}) stopCh := make(chan struct{})
ctx, cancel := contextFromChannelAndMaxWaitDuration(stopCh, time.Hour) ctx, cancel := contextFromChannelAndMaxWaitDuration(stopCh, time.Hour)