add auto update for apf bootstrap configuration

Take the following approach:
On a fresh install, all bootstrap configuration objects will
have auto update enabled via the following annotation :
`apf.kubernetes.io/autoupdate: 'true'`

The kube-apiserver periodically checks the bootstrap configuration
objects on the cluster and applies update if necessary.

We enforce an 'always auto-update' policy for the mandatory
configuration object(s).

We update the suggested configuration objects when:
- auto update is enabled (`apf.kubernetes.io/autoupdate: 'true'`) or
- auto update annotation key is missing but `generation` is `1`

If the configuration object is missing the annotation key, we add
it appropriately:
it is set to `true` if `generation` is `1`, `false` otherwise.

The above approach ensures that we don't squash changes made by an
operator. Please note, we can't protect the changes made by the
operator in the following scenario:
- the user changes the spec and then deletes and recreates
  the same object. (generation resets to 1)

remove using a marker
This commit is contained in:
Abu Kashem 2021-01-12 16:12:13 -05:00
parent a5cf298a95
commit 759a64136b
No known key found for this signature in database
GPG Key ID: 33A4FA7088DB68A9
9 changed files with 1949 additions and 267 deletions

View File

@ -0,0 +1,199 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ensurer
import (
"context"
"errors"
"fmt"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
)
var (
errObjectNotFlowSchema = errors.New("object is not a FlowSchema type")
)
// FlowSchemaEnsurer ensures the specified bootstrap configuration objects
type FlowSchemaEnsurer interface {
Ensure([]*flowcontrolv1beta1.FlowSchema) error
}
// FlowSchemaRemover removes the specified bootstrap configuration objects
type FlowSchemaRemover interface {
Remove([]string) error
}
// NewSuggestedFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of suggested FlowSchema configuration objects.
// shouldCreate indicates whether a missing 'suggested' FlowSchema object should be recreated.
func NewSuggestedFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface, shouldCreate bool) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
}
return &fsEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper, shouldCreate),
wrapper: wrapper,
}
}
// NewMandatoryFlowSchemaEnsurer returns a FlowSchemaEnsurer instance that
// can be used to ensure a set of mandatory FlowSchema configuration objects.
func NewMandatoryFlowSchemaEnsurer(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
wrapper := &flowSchemaWrapper{
client: client,
}
return &fsEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
}
// NewFlowSchemaRemover returns a FlowSchemaRemover instance that
// can be used to remove a set of FlowSchema configuration objects.
func NewFlowSchemaRemover(client flowcontrolclient.FlowSchemaInterface) FlowSchemaRemover {
return &fsEnsurer{
wrapper: &flowSchemaWrapper{
client: client,
},
}
}
// GetFlowSchemaRemoveCandidate returns a list of FlowSchema object
// names that are candidates for deletion from the cluster.
// bootstrap: a set of hard coded FlowSchema configuration objects
// kube-apiserver maintains in-memory.
func GetFlowSchemaRemoveCandidate(client flowcontrolclient.FlowSchemaInterface, bootstrap []*flowcontrolv1beta1.FlowSchema) ([]string, error) {
// TODO(101667): Use a lister here to avoid periodic LIST calls
fsList, err := client.List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list FlowSchema - %w", err)
}
bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(fsList.Items))
for i := range fsList.Items {
currentObjects[i] = &fsList.Items[i]
}
return getRemoveCandidate(bootstrapNames, currentObjects), nil
}
type fsEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
}
func (e *fsEnsurer) Ensure(flowSchemas []*flowcontrolv1beta1.FlowSchema) error {
for _, flowSchema := range flowSchemas {
if err := ensureConfiguration(e.wrapper, e.strategy, flowSchema); err != nil {
return err
}
}
return nil
}
func (e *fsEnsurer) Remove(flowSchemas []string) error {
for _, flowSchema := range flowSchemas {
if err := removeConfiguration(e.wrapper, flowSchema); err != nil {
return err
}
}
return nil
}
// flowSchemaWrapper abstracts all FlowSchema specific logic, with this
// we can manage all boiler plate code in one place.
type flowSchemaWrapper struct {
client flowcontrolclient.FlowSchemaInterface
}
func (fs *flowSchemaWrapper) TypeName() string {
return "FlowSchema"
}
func (fs *flowSchemaWrapper) Create(object runtime.Object) (runtime.Object, error) {
fsObject, ok := object.(*flowcontrolv1beta1.FlowSchema)
if !ok {
return nil, errObjectNotFlowSchema
}
return fs.client.Create(context.TODO(), fsObject, metav1.CreateOptions{FieldManager: fieldManager})
}
func (fs *flowSchemaWrapper) Update(object runtime.Object) (runtime.Object, error) {
fsObject, ok := object.(*flowcontrolv1beta1.FlowSchema)
if !ok {
return nil, errObjectNotFlowSchema
}
return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager})
}
func (fs *flowSchemaWrapper) Get(name string) (configurationObject, error) {
return fs.client.Get(context.TODO(), name, metav1.GetOptions{})
}
func (fs *flowSchemaWrapper) Delete(name string) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{})
}
func (fs *flowSchemaWrapper) CopySpec(bootstrap, current runtime.Object) error {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.FlowSchema)
if !ok {
return errObjectNotFlowSchema
}
currentFS, ok := current.(*flowcontrolv1beta1.FlowSchema)
if !ok {
return errObjectNotFlowSchema
}
specCopy := bootstrapFS.Spec.DeepCopy()
currentFS.Spec = *specCopy
return nil
}
func (fs *flowSchemaWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.FlowSchema)
if !ok {
return false, errObjectNotFlowSchema
}
currentFS, ok := current.(*flowcontrolv1beta1.FlowSchema)
if !ok {
return false, errObjectNotFlowSchema
}
return flowSchemaSpecChanged(bootstrapFS, currentFS), nil
}
func flowSchemaSpecChanged(expected, actual *flowcontrolv1beta1.FlowSchema) bool {
copiedExpectedFlowSchema := expected.DeepCopy()
flowcontrolapisv1beta1.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec)
}

View File

@ -0,0 +1,478 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ensurer
import (
"context"
"reflect"
"testing"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
)
func TestEnsureFlowSchema(t *testing.T) {
tests := []struct {
name string
strategy func(flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer
current *flowcontrolv1beta1.FlowSchema
bootstrap *flowcontrolv1beta1.FlowSchema
expected *flowcontrolv1beta1.FlowSchema
}{
// for suggested configurations
{
name: "suggested flow schema does not exist and we should ensure - the object should be created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, true)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil,
expected: newFlowSchema("fs1", "pl1", 100).Object(),
},
{
name: "suggested flow schema does not exist and we should not ensure - the object should not be created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, false)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: nil,
expected: nil,
},
{
name: "suggested flow schema exists, auto update is enabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, true)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "suggested flow schema exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewSuggestedFlowSchemaEnsurer(client, true)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
},
// for mandatory configurations
{
name: "mandatory flow schema does not exist - new object should be created",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewMandatoryFlowSchemaEnsurer(client)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
current: nil,
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory flow schema exists, annotation is missing - annotation should be added",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewMandatoryFlowSchemaEnsurer(client)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 100).Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory flow schema exists, auto update is disabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.FlowSchemaInterface) FlowSchemaEnsurer {
return NewMandatoryFlowSchemaEnsurer(client)
},
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
expected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta1().FlowSchemas()
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
}
ensurer := test.strategy(client)
err := ensurer.Ensure([]*flowcontrolv1beta1.FlowSchema{test.bootstrap})
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
fsGot, err := client.Get(context.TODO(), test.bootstrap.Name, metav1.GetOptions{})
switch {
case test.expected == nil:
if !apierrors.IsNotFound(err) {
t.Fatalf("Expected GET to return an %q error, but got: %v", metav1.StatusReasonNotFound, err)
}
case err != nil:
t.Fatalf("Expected GET to return no error, but got: %v", err)
}
if !reflect.DeepEqual(test.expected, fsGot) {
t.Errorf("FlowSchema does not match - diff: %s", cmp.Diff(test.expected, fsGot))
}
})
}
}
func TestSuggestedFSEnsureStrategy_ShouldUpdate(t *testing.T) {
tests := []struct {
name string
current *flowcontrolv1beta1.FlowSchema
bootstrap *flowcontrolv1beta1.FlowSchema
newObjectExpected *flowcontrolv1beta1.FlowSchema
}{
{
name: "auto update is enabled, first generation, spec does not match - spec update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 200).Object(),
newObjectExpected: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(),
},
{
name: "auto update is enabled, first generation, spec matches - no update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
newObjectExpected: nil,
},
{
name: "auto update is enabled, second generation, spec does not match - spec update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(2).Object(),
bootstrap: newFlowSchema("fs1", "pl2", 200).Object(),
newObjectExpected: newFlowSchema("fs1", "pl2", 200).WithAutoUpdateAnnotation("true").WithGeneration(2).Object(),
},
{
name: "auto update is enabled, second generation, spec matches - no update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(2).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, first generation, spec does not match - no update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(1).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 200).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, first generation, spec matches - no update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(1).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, second generation, spec does not match - no update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(),
bootstrap: newFlowSchema("fs1", "pl2", 200).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, second generation, spec matches - no update expected",
current: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
newObjectExpected: nil,
},
{
name: "annotation is missing, first generation, spec does not match - both annotation and spec update expected",
current: newFlowSchema("fs1", "pl1", 100).WithGeneration(1).Object(),
bootstrap: newFlowSchema("fs1", "pl2", 200).Object(),
newObjectExpected: newFlowSchema("fs1", "pl2", 200).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(),
},
{
name: "annotation is missing, first generation, spec matches - annotation update is expected",
current: newFlowSchema("fs1", "pl1", 100).WithGeneration(1).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
newObjectExpected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").WithGeneration(1).Object(),
},
{
name: "annotation is missing, second generation, spec does not match - annotation update is expected",
current: newFlowSchema("fs1", "pl1", 100).WithGeneration(2).Object(),
bootstrap: newFlowSchema("fs1", "pl2", 200).Object(),
newObjectExpected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(),
},
{
name: "annotation is missing, second generation, spec matches - annotation update is expected",
current: newFlowSchema("fs1", "pl1", 100).WithGeneration(2).Object(),
bootstrap: newFlowSchema("fs1", "pl1", 100).Object(),
newObjectExpected: newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("false").WithGeneration(2).Object(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&flowSchemaWrapper{}, false)
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap)
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
if test.newObjectExpected == nil {
if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
}
if updateGot {
t.Errorf("Expected update=%t but got: %t", false, updateGot)
}
return
}
if !updateGot {
t.Errorf("Expected update=%t but got: %t", true, updateGot)
}
if !reflect.DeepEqual(test.newObjectExpected, newObjectGot) {
t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, newObjectGot))
}
})
}
}
func TestFlowSchemaSpecChanged(t *testing.T) {
fs1 := &flowcontrolv1beta1.FlowSchema{
Spec: flowcontrolv1beta1.FlowSchemaSpec{},
}
fs2 := &flowcontrolv1beta1.FlowSchema{
Spec: flowcontrolv1beta1.FlowSchemaSpec{
MatchingPrecedence: 1,
},
}
fs1Defaulted := &flowcontrolv1beta1.FlowSchema{
Spec: flowcontrolv1beta1.FlowSchemaSpec{
MatchingPrecedence: flowcontrolapisv1beta1.FlowSchemaDefaultMatchingPrecedence,
},
}
testCases := []struct {
name string
expected *flowcontrolv1beta1.FlowSchema
actual *flowcontrolv1beta1.FlowSchema
specChanged bool
}{
{
name: "identical flow-schemas should work",
expected: bootstrap.MandatoryFlowSchemaCatchAll,
actual: bootstrap.MandatoryFlowSchemaCatchAll,
specChanged: false,
},
{
name: "defaulted flow-schemas should work",
expected: fs1,
actual: fs1Defaulted,
specChanged: false,
},
{
name: "non-defaulted flow-schema has wrong spec",
expected: fs1,
actual: fs2,
specChanged: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
w := flowSchemaSpecChanged(testCase.expected, testCase.actual)
assert.Equal(t, testCase.specChanged, w)
})
}
}
func TestRemoveFlowSchema(t *testing.T) {
tests := []struct {
name string
current *flowcontrolv1beta1.FlowSchema
bootstrapName string
removeExpected bool
}{
{
name: "flow schema does not exist",
bootstrapName: "fs1",
current: nil,
},
{
name: "flow schema exists, auto update is enabled",
bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("true").Object(),
removeExpected: true,
},
{
name: "flow schema exists, auto update is disabled",
bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("false").Object(),
removeExpected: false,
},
{
name: "flow schema exists, the auto-update annotation is malformed",
bootstrapName: "fs1",
current: newFlowSchema("fs1", "pl1", 200).WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta1().FlowSchemas()
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
}
remover := NewFlowSchemaRemover(client)
err := remover.Remove([]string{test.bootstrapName})
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if test.current == nil {
return
}
_, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{})
switch {
case test.removeExpected:
if !apierrors.IsNotFound(err) {
t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err)
}
default:
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
}
})
}
}
func TestGetFlowSchemaRemoveCandidate(t *testing.T) {
tests := []struct {
name string
current []*flowcontrolv1beta1.FlowSchema
bootstrap []*flowcontrolv1beta1.FlowSchema
expected []string
}{
{
name: "no object has been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{},
},
{
name: "bootstrap is empty, all current objects with the annotation should be candidates",
bootstrap: []*flowcontrolv1beta1.FlowSchema{},
current: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).Object(),
},
expected: []string{"fs1", "fs2"},
},
{
name: "object(s) have been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs3", "pl3", 300).WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{"fs2", "fs3"},
},
{
name: "object(s) without the annotation key are ignored",
bootstrap: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta1.FlowSchema{
newFlowSchema("fs1", "pl1", 100).WithAutoUpdateAnnotation("true").Object(),
newFlowSchema("fs2", "pl2", 200).Object(),
newFlowSchema("fs3", "pl3", 300).Object(),
},
expected: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta1().FlowSchemas()
for i := range test.current {
client.Create(context.TODO(), test.current[i], metav1.CreateOptions{})
}
removeListGot, err := GetFlowSchemaRemoveCandidate(client, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})
}
}
type fsBuilder struct {
object *flowcontrolv1beta1.FlowSchema
}
func newFlowSchema(name, plName string, matchingPrecedence int32) *fsBuilder {
return &fsBuilder{
object: &flowcontrolv1beta1.FlowSchema{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: flowcontrolv1beta1.FlowSchemaSpec{
PriorityLevelConfiguration: flowcontrolv1beta1.PriorityLevelConfigurationReference{
Name: plName,
},
MatchingPrecedence: matchingPrecedence,
},
},
}
}
func (b *fsBuilder) Object() *flowcontrolv1beta1.FlowSchema {
return b.object
}
func (b *fsBuilder) WithGeneration(value int64) *fsBuilder {
b.object.SetGeneration(value)
return b
}
func (b *fsBuilder) WithAutoUpdateAnnotation(value string) *fsBuilder {
setAnnotation(b.object, value)
return b
}
func setAnnotation(accessor metav1.Object, value string) {
if accessor.GetAnnotations() == nil {
accessor.SetAnnotations(map[string]string{})
}
accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey] = value
}

View File

@ -0,0 +1,199 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ensurer
import (
"context"
"errors"
"fmt"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
)
var (
errObjectNotPriorityLevel = errors.New("object is not a PriorityLevelConfiguration type")
)
// PriorityLevelEnsurer ensures the specified bootstrap configuration objects
type PriorityLevelEnsurer interface {
Ensure([]*flowcontrolv1beta1.PriorityLevelConfiguration) error
}
// PriorityLevelRemover removes the specified bootstrap configuration objects
type PriorityLevelRemover interface {
Remove([]string) error
}
// NewSuggestedPriorityLevelEnsurerEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of suggested PriorityLevelConfiguration configuration objects.
// shouldCreate indicates whether a missing 'suggested' PriorityLevelConfiguration object should be recreated.
func NewSuggestedPriorityLevelEnsurerEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface, shouldCreate bool) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client,
}
return &plEnsurer{
strategy: newSuggestedEnsureStrategy(wrapper, shouldCreate),
wrapper: wrapper,
}
}
// NewMandatoryPriorityLevelEnsurer returns a PriorityLevelEnsurer instance that
// can be used to ensure a set of mandatory PriorityLevelConfiguration configuration objects.
func NewMandatoryPriorityLevelEnsurer(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
wrapper := &priorityLevelConfigurationWrapper{
client: client,
}
return &plEnsurer{
strategy: newMandatoryEnsureStrategy(wrapper),
wrapper: wrapper,
}
}
// NewPriorityLevelRemover returns a PriorityLevelRemover instance that
// can be used to remove a set of PriorityLevelConfiguration configuration objects.
func NewPriorityLevelRemover(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelRemover {
return &plEnsurer{
wrapper: &priorityLevelConfigurationWrapper{
client: client,
},
}
}
// GetPriorityLevelRemoveCandidate returns a list of PriorityLevelConfiguration
// names that are candidates for removal from the cluster.
// bootstrap: a set of hard coded PriorityLevelConfiguration configuration
// objects kube-apiserver maintains in-memory.
func GetPriorityLevelRemoveCandidate(client flowcontrolclient.PriorityLevelConfigurationInterface, bootstrap []*flowcontrolv1beta1.PriorityLevelConfiguration) ([]string, error) {
// TODO(101667): Use a lister here to avoid periodic LIST calls
plList, err := client.List(context.TODO(), metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to list PriorityLevelConfiguration - %w", err)
}
bootstrapNames := sets.String{}
for i := range bootstrap {
bootstrapNames.Insert(bootstrap[i].GetName())
}
currentObjects := make([]metav1.Object, len(plList.Items))
for i := range plList.Items {
currentObjects[i] = &plList.Items[i]
}
return getRemoveCandidate(bootstrapNames, currentObjects), nil
}
type plEnsurer struct {
strategy ensureStrategy
wrapper configurationWrapper
}
func (e *plEnsurer) Ensure(priorityLevels []*flowcontrolv1beta1.PriorityLevelConfiguration) error {
for _, priorityLevel := range priorityLevels {
if err := ensureConfiguration(e.wrapper, e.strategy, priorityLevel); err != nil {
return err
}
}
return nil
}
func (e *plEnsurer) Remove(priorityLevels []string) error {
for _, priorityLevel := range priorityLevels {
if err := removeConfiguration(e.wrapper, priorityLevel); err != nil {
return err
}
}
return nil
}
// priorityLevelConfigurationWrapper abstracts all PriorityLevelConfiguration specific logic,
// with this we can manage all boiler plate code in one place.
type priorityLevelConfigurationWrapper struct {
client flowcontrolclient.PriorityLevelConfigurationInterface
}
func (fs *priorityLevelConfigurationWrapper) TypeName() string {
return "PriorityLevelConfiguration"
}
func (fs *priorityLevelConfigurationWrapper) Create(object runtime.Object) (runtime.Object, error) {
plObject, ok := object.(*flowcontrolv1beta1.PriorityLevelConfiguration)
if !ok {
return nil, errObjectNotPriorityLevel
}
return fs.client.Create(context.TODO(), plObject, metav1.CreateOptions{FieldManager: fieldManager})
}
func (fs *priorityLevelConfigurationWrapper) Update(object runtime.Object) (runtime.Object, error) {
fsObject, ok := object.(*flowcontrolv1beta1.PriorityLevelConfiguration)
if !ok {
return nil, errObjectNotPriorityLevel
}
return fs.client.Update(context.TODO(), fsObject, metav1.UpdateOptions{FieldManager: fieldManager})
}
func (fs *priorityLevelConfigurationWrapper) Get(name string) (configurationObject, error) {
return fs.client.Get(context.TODO(), name, metav1.GetOptions{})
}
func (fs *priorityLevelConfigurationWrapper) Delete(name string) error {
return fs.client.Delete(context.TODO(), name, metav1.DeleteOptions{})
}
func (fs *priorityLevelConfigurationWrapper) CopySpec(bootstrap, current runtime.Object) error {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.PriorityLevelConfiguration)
if !ok {
return errObjectNotPriorityLevel
}
currentFS, ok := current.(*flowcontrolv1beta1.PriorityLevelConfiguration)
if !ok {
return errObjectNotPriorityLevel
}
specCopy := bootstrapFS.Spec.DeepCopy()
currentFS.Spec = *specCopy
return nil
}
func (fs *priorityLevelConfigurationWrapper) HasSpecChanged(bootstrap, current runtime.Object) (bool, error) {
bootstrapFS, ok := bootstrap.(*flowcontrolv1beta1.PriorityLevelConfiguration)
if !ok {
return false, errObjectNotPriorityLevel
}
currentFS, ok := current.(*flowcontrolv1beta1.PriorityLevelConfiguration)
if !ok {
return false, errObjectNotPriorityLevel
}
return priorityLevelSpecChanged(bootstrapFS, currentFS), nil
}
func priorityLevelSpecChanged(expected, actual *flowcontrolv1beta1.PriorityLevelConfiguration) bool {
copiedExpectedPriorityLevel := expected.DeepCopy()
flowcontrolapisv1beta1.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec)
}

View File

@ -0,0 +1,508 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ensurer
import (
"context"
"reflect"
"testing"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta1"
flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
)
func TestEnsurePriorityLevel(t *testing.T) {
tests := []struct {
name string
strategy func(flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer
current *flowcontrolv1beta1.PriorityLevelConfiguration
bootstrap *flowcontrolv1beta1.PriorityLevelConfiguration
expected *flowcontrolv1beta1.PriorityLevelConfiguration
}{
// for suggested configurations
{
name: "suggested priority level configuration does not exist and we should ensure - new object should be created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, true)
},
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).Object(),
},
{
name: "suggested priority level configuration does not exist and we should not ensure - new object should not be created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, false)
},
bootstrap: newPLConfiguration("pl1").WithLimited(10).Object(),
current: nil,
expected: nil,
},
{
name: "suggested priority level configuration exists, auto update is enabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, true)
}, bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
{
name: "suggested priority level configuration exists, auto update is disabled, spec does not match - current object should not be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewSuggestedPriorityLevelEnsurerEnsurer(client, true)
},
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
},
// for mandatory configurations
{
name: "mandatory priority level configuration does not exist - new object should be created",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewMandatoryPriorityLevelEnsurer(client)
},
bootstrap: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
current: nil,
expected: newPLConfiguration("pl1").WithLimited(10).WithAutoUpdateAnnotation("true").Object(),
},
{
name: "mandatory priority level configuration exists, annotation is missing - annotation is added",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewMandatoryPriorityLevelEnsurer(client)
},
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithLimited(20).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
{
name: "mandatory priority level configuration exists, auto update is disabled, spec does not match - current object should be updated",
strategy: func(client flowcontrolclient.PriorityLevelConfigurationInterface) PriorityLevelEnsurer {
return NewMandatoryPriorityLevelEnsurer(client)
},
bootstrap: newPLConfiguration("pl1").WithLimited(20).Object(),
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").WithLimited(10).Object(),
expected: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").WithLimited(20).Object(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta1().PriorityLevelConfigurations()
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
}
ensurer := test.strategy(client)
err := ensurer.Ensure([]*flowcontrolv1beta1.PriorityLevelConfiguration{test.bootstrap})
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
plGot, err := client.Get(context.TODO(), test.bootstrap.Name, metav1.GetOptions{})
switch {
case test.expected == nil:
if !apierrors.IsNotFound(err) {
t.Fatalf("Expected GET to return an %q error, but got: %v", metav1.StatusReasonNotFound, err)
}
case err != nil:
t.Fatalf("Expected GET to return no error, but got: %v", err)
}
if !reflect.DeepEqual(test.expected, plGot) {
t.Errorf("PriorityLevelConfiguration does not match - diff: %s", cmp.Diff(test.expected, plGot))
}
})
}
}
func TestSuggestedPLEnsureStrategy_ShouldUpdate(t *testing.T) {
tests := []struct {
name string
current *flowcontrolv1beta1.PriorityLevelConfiguration
bootstrap *flowcontrolv1beta1.PriorityLevelConfiguration
newObjectExpected *flowcontrolv1beta1.PriorityLevelConfiguration
}{
{
name: "auto update is enabled, first generation, spec does not match - spec update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(10).Object(),
newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(10).Object(),
},
{
name: "auto update is enabled, first generation, spec matches - no update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithGeneration(1).WithLimited(5).Object(),
newObjectExpected: nil,
},
{
name: "auto update is enabled, second generation, spec does not match - spec update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(2).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(10).Object(),
newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(2).WithLimited(10).Object(),
},
{
name: "auto update is enabled, second generation, spec matches - no update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(2).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(5).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, first generation, spec does not match - no update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(1).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(10).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, first generation, spec matches - no update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(1).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(5).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, second generation, spec does not match - no update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(10).Object(),
newObjectExpected: nil,
},
{
name: "auto update is disabled, second generation, spec matches - no update expected",
current: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(5).Object(),
newObjectExpected: nil,
},
{
name: "annotation is missing, first generation, spec does not match - both annotation and spec update expected",
current: newPLConfiguration("foo").WithGeneration(1).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(10).Object(),
newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(10).Object(),
},
{
name: "annotation is missing, first generation, spec matches - annotation update is expected",
current: newPLConfiguration("foo").WithGeneration(1).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(5).Object(),
newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("true").WithGeneration(1).WithLimited(5).Object(),
},
{
name: "annotation is missing, second generation, spec does not match - annotation update is expected",
current: newPLConfiguration("foo").WithGeneration(2).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(10).Object(),
newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(),
},
{
name: "annotation is missing, second generation, spec matches - annotation update is expected",
current: newPLConfiguration("foo").WithGeneration(2).WithLimited(5).Object(),
bootstrap: newPLConfiguration("foo").WithLimited(5).Object(),
newObjectExpected: newPLConfiguration("foo").WithAutoUpdateAnnotation("false").WithGeneration(2).WithLimited(5).Object(),
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
strategy := newSuggestedEnsureStrategy(&priorityLevelConfigurationWrapper{}, false)
newObjectGot, updateGot, err := strategy.ShouldUpdate(test.current, test.bootstrap)
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
if test.newObjectExpected == nil {
if newObjectGot != nil {
t.Errorf("Expected a nil object, but got: %#v", newObjectGot)
}
if updateGot {
t.Errorf("Expected update=%t but got: %t", false, updateGot)
}
return
}
if !updateGot {
t.Errorf("Expected update=%t but got: %t", true, updateGot)
}
if !reflect.DeepEqual(test.newObjectExpected, newObjectGot) {
t.Errorf("Expected the object to be updated to match - diff: %s", cmp.Diff(test.newObjectExpected, newObjectGot))
}
})
}
}
func TestPriorityLevelSpecChanged(t *testing.T) {
pl1 := &flowcontrolv1beta1.PriorityLevelConfiguration{
Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1beta1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
LimitResponse: flowcontrolv1beta1.LimitResponse{
Type: flowcontrolv1beta1.LimitResponseTypeReject,
},
},
},
}
pl2 := &flowcontrolv1beta1.PriorityLevelConfiguration{
Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1beta1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: 1,
},
},
}
pl1Defaulted := &flowcontrolv1beta1.PriorityLevelConfiguration{
Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1beta1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: flowcontrolapisv1beta1.PriorityLevelConfigurationDefaultAssuredConcurrencyShares,
LimitResponse: flowcontrolv1beta1.LimitResponse{
Type: flowcontrolv1beta1.LimitResponseTypeReject,
},
},
},
}
testCases := []struct {
name string
expected *flowcontrolv1beta1.PriorityLevelConfiguration
actual *flowcontrolv1beta1.PriorityLevelConfiguration
specChanged bool
}{
{
name: "identical priority-level should work",
expected: bootstrap.MandatoryPriorityLevelConfigurationCatchAll,
actual: bootstrap.MandatoryPriorityLevelConfigurationCatchAll,
specChanged: false,
},
{
name: "defaulted priority-level should work",
expected: pl1,
actual: pl1Defaulted,
specChanged: false,
},
{
name: "non-defaulted priority-level has wrong spec",
expected: pl1,
actual: pl2,
specChanged: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
w := priorityLevelSpecChanged(testCase.expected, testCase.actual)
assert.Equal(t, testCase.specChanged, w)
})
}
}
func TestRemovePriorityLevelConfiguration(t *testing.T) {
tests := []struct {
name string
current *flowcontrolv1beta1.PriorityLevelConfiguration
bootstrapName string
removeExpected bool
}{
{
name: "priority level configuration does not exist",
bootstrapName: "pl1",
current: nil,
},
{
name: "priority level configuration exists, auto update is enabled",
bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
removeExpected: true,
},
{
name: "priority level configuration exists, auto update is disabled",
bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("false").Object(),
removeExpected: false,
},
{
name: "priority level configuration exists, the auto-update annotation is malformed",
bootstrapName: "pl1",
current: newPLConfiguration("pl1").WithAutoUpdateAnnotation("invalid").Object(),
removeExpected: false,
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta1().PriorityLevelConfigurations()
if test.current != nil {
client.Create(context.TODO(), test.current, metav1.CreateOptions{})
}
remover := NewPriorityLevelRemover(client)
err := remover.Remove([]string{test.bootstrapName})
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if test.current == nil {
return
}
_, err = client.Get(context.TODO(), test.bootstrapName, metav1.GetOptions{})
switch {
case test.removeExpected:
if !apierrors.IsNotFound(err) {
t.Errorf("Expected error: %q, but got: %v", metav1.StatusReasonNotFound, err)
}
default:
if err != nil {
t.Errorf("Expected no error, but got: %v", err)
}
}
})
}
}
func TestGetPriorityLevelRemoveCandidate(t *testing.T) {
tests := []struct {
name string
current []*flowcontrolv1beta1.PriorityLevelConfiguration
bootstrap []*flowcontrolv1beta1.PriorityLevelConfiguration
expected []string
}{
{
name: "no object has been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{},
},
{
name: "bootstrap is empty, all current objects with the annotation should be candidates",
bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{},
current: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").Object(),
},
expected: []string{"pl1", "pl2"},
},
{
name: "object(s) have been removed from the bootstrap configuration",
bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl3").WithAutoUpdateAnnotation("true").Object(),
},
expected: []string{"pl2", "pl3"},
},
{
name: "object(s) without the annotation key are ignored",
bootstrap: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
},
current: []*flowcontrolv1beta1.PriorityLevelConfiguration{
newPLConfiguration("pl1").WithAutoUpdateAnnotation("true").Object(),
newPLConfiguration("pl2").Object(),
newPLConfiguration("pl3").Object(),
},
expected: []string{},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
client := fake.NewSimpleClientset().FlowcontrolV1beta1().PriorityLevelConfigurations()
for i := range test.current {
client.Create(context.TODO(), test.current[i], metav1.CreateOptions{})
}
removeListGot, err := GetPriorityLevelRemoveCandidate(client, test.bootstrap)
if err != nil {
t.Fatalf("Expected no error, but got: %v", err)
}
if !cmp.Equal(test.expected, removeListGot) {
t.Errorf("Remove candidate list does not match - diff: %s", cmp.Diff(test.expected, removeListGot))
}
})
}
}
type plBuilder struct {
object *flowcontrolv1beta1.PriorityLevelConfiguration
}
func newPLConfiguration(name string) *plBuilder {
return &plBuilder{
object: &flowcontrolv1beta1.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
},
}
}
func (b *plBuilder) Object() *flowcontrolv1beta1.PriorityLevelConfiguration {
return b.object
}
func (b *plBuilder) WithGeneration(value int64) *plBuilder {
b.object.SetGeneration(value)
return b
}
func (b *plBuilder) WithAutoUpdateAnnotation(value string) *plBuilder {
setAnnotation(b.object, value)
return b
}
func (b *plBuilder) WithLimited(assuredConcurrencyShares int32) *plBuilder {
b.object.Spec.Type = flowcontrolv1beta1.PriorityLevelEnablementLimited
b.object.Spec.Limited = &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: assuredConcurrencyShares,
LimitResponse: flowcontrolv1beta1.LimitResponse{
Type: flowcontrolv1beta1.LimitResponseTypeReject,
},
}
return b
}
// must be called after WithLimited
func (b *plBuilder) WithQueuing(queues, handSize, queueLengthLimit int32) *plBuilder {
limited := b.object.Spec.Limited
if limited == nil {
return b
}
limited.LimitResponse.Type = flowcontrolv1beta1.LimitResponseTypeQueue
limited.LimitResponse.Queuing = &flowcontrolv1beta1.QueuingConfiguration{
Queues: queues,
HandSize: handSize,
QueueLengthLimit: queueLengthLimit,
}
return b
}

View File

@ -0,0 +1,335 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ensurer
import (
"errors"
"fmt"
"strconv"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
"github.com/google/go-cmp/cmp"
)
const (
fieldManager = "api-priority-and-fairness-config-producer-v1"
)
// ensureStrategy provides a strategy for ensuring apf bootstrap configurationWrapper.
// We have two types of configurationWrapper objects:
// - mandatory: the mandatory configurationWrapper objects are about ensuring that the P&F
// system itself won't crash; we have to be sure there's 'catch-all' place for
// everything to go. Any changes made by the cluster operators to these
// configurationWrapper objects will be stomped by the apiserver.
//
// - suggested: additional configurationWrapper objects for initial behavior.
// the cluster operators have an option to edit or delete these configurationWrapper objects.
type ensureStrategy interface {
// Name of the strategy, for now we have two: 'mandatory' and 'suggested'.
// This comes handy in logging.
Name() string
// ShouldCreate returns true if a missing configuration object should be recreated.
ShouldCreate() bool
// ShouldUpdate accepts the current and the bootstrap configuration and determines
// whether an update is necessary.
// current is the existing in-cluster configuration object.
// bootstrap is the configuration the kube-apiserver maintains in-memory.
//
// ok: true if auto update is required, otherwise false
// object: the new object represents the new configuration to be stored in-cluster.
// err: err is set when the function runs into an error and can not
// determine if auto update is needed.
ShouldUpdate(current, bootstrap configurationObject) (object runtime.Object, ok bool, err error)
}
// this internal interface provides abstraction for dealing with the `Spec`
// of both 'FlowSchema' and 'PriorityLevelConfiguration' objects.
// Since the ensure logic for both types is common, we use a few internal interfaces
// to abstract out the differences of these two types.
type specCopier interface {
// HasSpecChanged returns true if the spec of both the bootstrap and
// the current configuration object is same, otherwise false.
HasSpecChanged(bootstrap, current runtime.Object) (bool, error)
// CopySpec makes a deep copy the spec of the bootstrap object
// and copies it to that of the current object.
// CopySpec assumes that the current object is safe to mutate, so it
// rests with the caller to make a deep copy of the current.
CopySpec(bootstrap, current runtime.Object) error
}
// this internal interface provides abstraction for CRUD operation
// related to both 'FlowSchema' and 'PriorityLevelConfiguration' objects.
// Since the ensure logic for both types is common, we use a few internal interfaces
// to abstract out the differences of these two types.
type configurationClient interface {
Create(object runtime.Object) (runtime.Object, error)
Update(object runtime.Object) (runtime.Object, error)
Get(name string) (configurationObject, error)
Delete(name string) error
}
type configurationWrapper interface {
// TypeName returns the type of the configuration that this interface deals with.
// We use it to log the type name of the configuration object being ensured.
// It is either 'PriorityLevelConfiguration' or 'FlowSchema'
TypeName() string
configurationClient
specCopier
}
// A convenient wrapper interface that is used by the ensure logic.
type configurationObject interface {
metav1.Object
runtime.Object
}
func newSuggestedEnsureStrategy(copier specCopier, shouldCreate bool) ensureStrategy {
return &strategy{
copier: copier,
shouldCreate: shouldCreate,
alwaysAutoUpdateSpec: false,
name: "suggested",
}
}
func newMandatoryEnsureStrategy(copier specCopier) ensureStrategy {
return &strategy{
copier: copier,
shouldCreate: true,
alwaysAutoUpdateSpec: true,
name: "mandatory",
}
}
// auto-update strategy for the configuration objects
type strategy struct {
copier specCopier
shouldCreate bool
alwaysAutoUpdateSpec bool
name string
}
func (s *strategy) Name() string {
return s.name
}
func (s *strategy) ShouldCreate() bool {
return s.shouldCreate
}
func (s *strategy) ShouldUpdate(current, bootstrap configurationObject) (runtime.Object, bool, error) {
if current == nil || bootstrap == nil {
return nil, false, nil
}
autoUpdateSpec := s.alwaysAutoUpdateSpec
if !autoUpdateSpec {
autoUpdateSpec = shouldUpdateSpec(current)
}
updateAnnotation := shouldUpdateAnnotation(current, autoUpdateSpec)
var specChanged bool
if autoUpdateSpec {
changed, err := s.copier.HasSpecChanged(bootstrap, current)
if err != nil {
return nil, false, fmt.Errorf("failed to compare spec - %w", err)
}
specChanged = changed
}
if !(updateAnnotation || specChanged) {
// the annotation key is up to date and the spec has not changed, no update is necessary
return nil, false, nil
}
// if we are here, either we need to update the annotation key or the spec.
copy, ok := current.DeepCopyObject().(configurationObject)
if !ok {
// we should never be here
return nil, false, errors.New("incompatible object type")
}
if updateAnnotation {
setAutoUpdateAnnotation(copy, autoUpdateSpec)
}
if specChanged {
s.copier.CopySpec(bootstrap, copy)
}
return copy, true, nil
}
// shouldUpdateSpec inspects the auto-update annotation key and generation field to determine
// whether the configurationWrapper object should be auto-updated.
func shouldUpdateSpec(accessor metav1.Object) bool {
value, _ := accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey]
if autoUpdate, err := strconv.ParseBool(value); err == nil {
return autoUpdate
}
// We are here because of either a or b:
// a. the annotation key is missing.
// b. the annotation key is present but the value does not represent a boolean.
// In either case, if the operator hasn't changed the spec, we can safely auto update.
// Please note that we can't protect the changes made by the operator in the following scenario:
// - The operator deletes and recreates the same object with a variant spec (generation resets to 1).
if accessor.GetGeneration() == 1 {
return true
}
return false
}
// shouldUpdateAnnotation determines whether the current value of the auto-update annotation
// key matches the desired value.
func shouldUpdateAnnotation(accessor metav1.Object, desired bool) bool {
if value, ok := accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey]; ok {
if current, err := strconv.ParseBool(value); err == nil && current == desired {
return false
}
}
return true
}
// setAutoUpdateAnnotation sets the auto-update annotation key to the specified value.
func setAutoUpdateAnnotation(accessor metav1.Object, autoUpdate bool) {
if accessor.GetAnnotations() == nil {
accessor.SetAnnotations(map[string]string{})
}
accessor.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey] = strconv.FormatBool(autoUpdate)
}
// ensureConfiguration ensures the boostrap configurationWrapper on the cluster based on the specified strategy.
func ensureConfiguration(wrapper configurationWrapper, strategy ensureStrategy, bootstrap configurationObject) error {
name := bootstrap.GetName()
configurationType := strategy.Name()
current, err := wrapper.Get(bootstrap.GetName())
if err != nil {
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed to retrieve %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
}
if strategy.ShouldCreate() {
if _, err := wrapper.Create(bootstrap); err != nil {
return fmt.Errorf("cannot create %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
}
klog.V(2).InfoS(fmt.Sprintf("Successfully created %s", wrapper.TypeName()), "type", configurationType, "name", name)
return nil
}
klog.V(5).InfoS(fmt.Sprintf("Skipping creation of %s", wrapper.TypeName()), "type", configurationType, "name", name)
return nil
}
klog.V(5).InfoS(fmt.Sprintf("The %s already exists, checking whether it is up to date", wrapper.TypeName()), "type", configurationType, "name", name)
newObject, update, err := strategy.ShouldUpdate(current, bootstrap)
if err != nil {
return fmt.Errorf("failed to determine whether auto-update is required for %s type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
}
if !update {
if klog.V(5).Enabled() {
// TODO: if we use structured logging here the diff gets escaped and very awkward to read in the log
klog.Infof("No update required for the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, bootstrap))
}
return nil
}
if _, err := wrapper.Update(newObject); err != nil {
return fmt.Errorf("failed to update the %s, will retry later type=%s name=%q error=%w", wrapper.TypeName(), configurationType, name, err)
}
klog.V(2).Infof("Updated the %s type=%s name=%q diff: %s", wrapper.TypeName(), configurationType, name, cmp.Diff(current, newObject))
return nil
}
func removeConfiguration(wrapper configurationWrapper, name string) error {
current, err := wrapper.Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to retrieve the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err)
}
value := current.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey]
autoUpdate, err := strconv.ParseBool(value)
if err != nil {
klog.ErrorS(err, fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name)
// This may need manual intervention, in case the annotation value is malformed,
// so don't return an error, that might trigger futile retry loop.
return nil
}
if !autoUpdate {
klog.V(5).InfoS(fmt.Sprintf("Skipping deletion of the %s", wrapper.TypeName()), "name", name)
return nil
}
if err := wrapper.Delete(name); err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return fmt.Errorf("failed to delete the %s, will retry later name=%q error=%w", wrapper.TypeName(), name, err)
}
klog.V(2).InfoS(fmt.Sprintf("Successfully deleted the %s", wrapper.TypeName()), "name", name)
return nil
}
// getRemoveCandidate returns a list of configuration objects we should delete
// from the cluster given a set of bootstrap and current configuration.
// bootstrap: a set of hard coded configuration kube-apiserver maintains in-memory.
// current: a set of configuration objects that exist on the cluster
// Any object present in current is a candidate for removal if both a and b are true:
// a. the object in current is missing from the bootstrap configuration
// b. the object has the designated auto-update annotation key
// This function shares the common logic for both FlowSchema and PriorityLevelConfiguration
// type and hence it accepts metav1.Object only.
func getRemoveCandidate(bootstrap sets.String, current []metav1.Object) []string {
if len(current) == 0 {
return nil
}
candidates := make([]string, 0)
for i := range current {
object := current[i]
if _, ok := object.GetAnnotations()[flowcontrolv1beta1.AutoUpdateAnnotationKey]; !ok {
// the configuration object does not have the annotation key
continue
}
if _, ok := bootstrap[object.GetName()]; !ok {
candidates = append(candidates, object.GetName())
}
}
return candidates
}

View File

@ -21,8 +21,6 @@ import (
"fmt"
"time"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
@ -37,6 +35,7 @@ import (
"k8s.io/kubernetes/pkg/apis/flowcontrol"
flowcontrolapisv1alpha1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1alpha1"
flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
"k8s.io/kubernetes/pkg/registry/flowcontrol/ensurer"
flowschemastore "k8s.io/kubernetes/pkg/registry/flowcontrol/flowschema/storage"
prioritylevelconfigurationstore "k8s.io/kubernetes/pkg/registry/flowcontrol/prioritylevelconfiguration/storage"
)
@ -101,70 +100,78 @@ func (p RESTStorageProvider) GroupName() string {
// PostStartHook returns the hook func that launches the config provider
func (p RESTStorageProvider) PostStartHook() (string, genericapiserver.PostStartHookFunc, error) {
return PostStartHookName, func(hookContext genericapiserver.PostStartHookContext) error {
flowcontrolClientSet := flowcontrolclient.NewForConfigOrDie(hookContext.LoopbackClientConfig)
go func() {
const retryCreatingSuggestedSettingsInterval = time.Second
err := wait.PollImmediateUntil(
retryCreatingSuggestedSettingsInterval,
func() (bool, error) {
should, err := shouldEnsureSuggested(flowcontrolClientSet)
if err != nil {
klog.Errorf("failed getting exempt flow-schema, will retry later: %v", err)
return false, nil
}
if !should {
return true, nil
}
err = ensure(
flowcontrolClientSet,
flowcontrolbootstrap.SuggestedFlowSchemas,
flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
if err != nil {
klog.Errorf("failed ensuring suggested settings, will retry later: %v", err)
return false, nil
}
return true, nil
},
hookContext.StopCh)
if err != nil {
klog.ErrorS(err, "Ensuring suggested configuration failed")
// We should not attempt creation of mandatory objects if ensuring the suggested
// configuration resulted in an error.
// This only happens when the stop channel is closed.
// We rely on the presence of the "exempt" priority level configuration object in the cluster
// to indicate whether we should ensure suggested configuration.
return
}
const retryCreatingMandatorySettingsInterval = time.Minute
_ = wait.PollImmediateUntil(
retryCreatingMandatorySettingsInterval,
func() (bool, error) {
if err := upgrade(
flowcontrolClientSet,
flowcontrolbootstrap.MandatoryFlowSchemas,
// Note: the "exempt" priority-level is supposed to be the last item in the pre-defined
// list, so that a crash in the midst of the first kube-apiserver startup does not prevent
// the full initial set of objects from being created.
flowcontrolbootstrap.MandatoryPriorityLevelConfigurations,
); err != nil {
klog.Errorf("failed creating mandatory flowcontrol settings: %v", err)
return false, nil
}
return false, nil // always retry
},
hookContext.StopCh)
}()
return nil
}, nil
return PostStartHookName, ensureAPFBootstrapConfiguration, nil
}
// shouldEnsureSuggested checks if the exempt priority level exists and returns
func ensureAPFBootstrapConfiguration(hookContext genericapiserver.PostStartHookContext) error {
clientset, err := flowcontrolclient.NewForConfig(hookContext.LoopbackClientConfig)
if err != nil {
return fmt.Errorf("failed to initialize clientset for APF - %w", err)
}
// get a derived context that gets cancelled after 5m or
// when the StopCh gets closed, whichever happens first.
ctx, cancel := contextFromChannelAndMaxWaitDuration(hookContext.StopCh, 5*time.Minute)
defer cancel()
err = wait.PollImmediateUntilWithContext(
ctx,
time.Second,
func(context.Context) (bool, error) {
if err := ensure(clientset); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
return false, nil
}
return true, nil
})
if err != nil {
return fmt.Errorf("unable to initialize APF bootstrap configuration")
}
// we have successfully initialized the bootstrap configuration, now we
// spin up a goroutine which reconciles the bootstrap configuration periodically.
go func() {
err := wait.PollImmediateUntil(
time.Minute,
func() (bool, error) {
if err := ensure(clientset); err != nil {
klog.ErrorS(err, "APF bootstrap ensurer ran into error, will retry later")
}
// always auto update both suggested and mandatory configuration
return false, nil
}, hookContext.StopCh)
if err != nil {
klog.ErrorS(err, "APF bootstrap ensurer is exiting")
}
}()
return nil
}
func ensure(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
if err := ensureSuggestedConfiguration(clientset); err != nil {
// We should not attempt creation of mandatory objects if ensuring the suggested
// configuration resulted in an error.
// This only happens when the stop channel is closed.
// We rely on the presence of the "exempt" priority level configuration object in the cluster
// to indicate whether we should ensure suggested configuration.
return fmt.Errorf("failed ensuring suggested settings - %w", err)
}
if err := ensureMandatoryConfiguration(clientset); err != nil {
return fmt.Errorf("failed ensuring mandatory settings - %w", err)
}
if err := removeConfiguration(clientset); err != nil {
return fmt.Errorf("failed to delete removed settings - %w", err)
}
return nil
}
// shouldCreateSuggested checks if the exempt priority level exists and returns
// whether the suggested flow schemas and priority levels should be ensured.
func shouldEnsureSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface) (bool, error) {
func shouldCreateSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface) (bool, error) {
if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), flowcontrol.PriorityLevelConfigurationNameExempt, metav1.GetOptions{}); err != nil {
if apierrors.IsNotFound(err) {
return true, nil
@ -174,106 +181,89 @@ func shouldEnsureSuggested(flowcontrolClientSet flowcontrolclient.FlowcontrolV1b
return false, nil
}
const thisFieldManager = "api-priority-and-fairness-config-producer-v1"
func ensureSuggestedConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
shouldCreateSuggested, err := shouldCreateSuggested(clientset)
if err != nil {
return fmt.Errorf("failed to determine whether suggested configuration should be created - error: %w", err)
}
func ensure(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface, flowSchemas []*flowcontrolv1beta1.FlowSchema, priorityLevels []*flowcontrolv1beta1.PriorityLevelConfiguration) error {
for _, flowSchema := range flowSchemas {
_, err := flowcontrolClientSet.FlowSchemas().Create(context.TODO(), flowSchema, metav1.CreateOptions{FieldManager: thisFieldManager})
if apierrors.IsAlreadyExists(err) {
klog.V(3).Infof("Suggested FlowSchema %s already exists, skipping creating", flowSchema.Name)
continue
}
if err != nil {
return fmt.Errorf("cannot create suggested FlowSchema %s due to %v", flowSchema.Name, err)
}
klog.V(3).Infof("Created suggested FlowSchema %s", flowSchema.Name)
fsEnsurer := ensurer.NewSuggestedFlowSchemaEnsurer(clientset.FlowSchemas(), shouldCreateSuggested)
if err := fsEnsurer.Ensure(flowcontrolbootstrap.SuggestedFlowSchemas); err != nil {
return err
}
for _, priorityLevelConfiguration := range priorityLevels {
_, err := flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), priorityLevelConfiguration, metav1.CreateOptions{FieldManager: thisFieldManager})
if apierrors.IsAlreadyExists(err) {
klog.V(3).Infof("Suggested PriorityLevelConfiguration %s already exists, skipping creating", priorityLevelConfiguration.Name)
continue
}
if err != nil {
return fmt.Errorf("cannot create suggested PriorityLevelConfiguration %s due to %v", priorityLevelConfiguration.Name, err)
}
klog.V(3).Infof("Created suggested PriorityLevelConfiguration %s", priorityLevelConfiguration.Name)
}
return nil
plEnsurer := ensurer.NewSuggestedPriorityLevelEnsurerEnsurer(clientset.PriorityLevelConfigurations(), shouldCreateSuggested)
return plEnsurer.Ensure(flowcontrolbootstrap.SuggestedPriorityLevelConfigurations)
}
func upgrade(flowcontrolClientSet flowcontrolclient.FlowcontrolV1beta1Interface, flowSchemas []*flowcontrolv1beta1.FlowSchema, priorityLevels []*flowcontrolv1beta1.PriorityLevelConfiguration) error {
for _, expectedFlowSchema := range flowSchemas {
actualFlowSchema, err := flowcontrolClientSet.FlowSchemas().Get(context.TODO(), expectedFlowSchema.Name, metav1.GetOptions{})
if err == nil {
// TODO(yue9944882): extract existing version from label and compare
// TODO(yue9944882): create w/ version string attached
wrongSpec, err := flowSchemaHasWrongSpec(expectedFlowSchema, actualFlowSchema)
if err != nil {
return fmt.Errorf("failed checking if mandatory FlowSchema %s is up-to-date due to %v, will retry later", expectedFlowSchema.Name, err)
}
if wrongSpec {
if _, err := flowcontrolClientSet.FlowSchemas().Update(context.TODO(), expectedFlowSchema, metav1.UpdateOptions{FieldManager: thisFieldManager}); err != nil {
return fmt.Errorf("failed upgrading mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err)
}
klog.V(3).Infof("Updated mandatory FlowSchema %s because its spec was %#+v but it must be %#+v", expectedFlowSchema.Name, actualFlowSchema.Spec, expectedFlowSchema.Spec)
}
continue
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed getting mandatory FlowSchema %s due to %v, will retry later", expectedFlowSchema.Name, err)
}
_, err = flowcontrolClientSet.FlowSchemas().Create(context.TODO(), expectedFlowSchema, metav1.CreateOptions{FieldManager: thisFieldManager})
if apierrors.IsAlreadyExists(err) {
klog.V(3).Infof("Mandatory FlowSchema %s already exists, skipping creating", expectedFlowSchema.Name)
continue
}
if err != nil {
return fmt.Errorf("cannot create mandatory FlowSchema %s due to %v", expectedFlowSchema.Name, err)
}
klog.V(3).Infof("Created mandatory FlowSchema %s", expectedFlowSchema.Name)
func ensureMandatoryConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
fsEnsurer := ensurer.NewMandatoryFlowSchemaEnsurer(clientset.FlowSchemas())
if err := fsEnsurer.Ensure(flowcontrolbootstrap.MandatoryFlowSchemas); err != nil {
return err
}
for _, expectedPriorityLevelConfiguration := range priorityLevels {
actualPriorityLevelConfiguration, err := flowcontrolClientSet.PriorityLevelConfigurations().Get(context.TODO(), expectedPriorityLevelConfiguration.Name, metav1.GetOptions{})
if err == nil {
// TODO(yue9944882): extract existing version from label and compare
// TODO(yue9944882): create w/ version string attached
wrongSpec, err := priorityLevelHasWrongSpec(expectedPriorityLevelConfiguration, actualPriorityLevelConfiguration)
if err != nil {
return fmt.Errorf("failed checking if mandatory PriorityLevelConfiguration %s is up-to-date due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
}
if wrongSpec {
if _, err := flowcontrolClientSet.PriorityLevelConfigurations().Update(context.TODO(), expectedPriorityLevelConfiguration, metav1.UpdateOptions{FieldManager: thisFieldManager}); err != nil {
return fmt.Errorf("failed upgrading mandatory PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
}
klog.V(3).Infof("Updated mandatory PriorityLevelConfiguration %s because its spec was %#+v but must be %#+v", expectedPriorityLevelConfiguration.Name, actualPriorityLevelConfiguration.Spec, expectedPriorityLevelConfiguration.Spec)
}
continue
}
if !apierrors.IsNotFound(err) {
return fmt.Errorf("failed getting PriorityLevelConfiguration %s due to %v, will retry later", expectedPriorityLevelConfiguration.Name, err)
}
_, err = flowcontrolClientSet.PriorityLevelConfigurations().Create(context.TODO(), expectedPriorityLevelConfiguration, metav1.CreateOptions{FieldManager: thisFieldManager})
if apierrors.IsAlreadyExists(err) {
klog.V(3).Infof("Mandatory PriorityLevelConfiguration %s already exists, skipping creating", expectedPriorityLevelConfiguration.Name)
continue
}
if err != nil {
return fmt.Errorf("cannot create mandatory PriorityLevelConfiguration %s due to %v", expectedPriorityLevelConfiguration.Name, err)
}
klog.V(3).Infof("Created mandatory PriorityLevelConfiguration %s", expectedPriorityLevelConfiguration.Name)
}
return nil
plEnsurer := ensurer.NewMandatoryPriorityLevelEnsurer(clientset.PriorityLevelConfigurations())
return plEnsurer.Ensure(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations)
}
func flowSchemaHasWrongSpec(expected, actual *flowcontrolv1beta1.FlowSchema) (bool, error) {
copiedExpectedFlowSchema := expected.DeepCopy()
flowcontrolapisv1beta1.SetObjectDefaults_FlowSchema(copiedExpectedFlowSchema)
return !equality.Semantic.DeepEqual(copiedExpectedFlowSchema.Spec, actual.Spec), nil
func removeConfiguration(clientset flowcontrolclient.FlowcontrolV1beta1Interface) error {
if err := removeFlowSchema(clientset.FlowSchemas()); err != nil {
return err
}
return removePriorityLevel(clientset.PriorityLevelConfigurations())
}
func priorityLevelHasWrongSpec(expected, actual *flowcontrolv1beta1.PriorityLevelConfiguration) (bool, error) {
copiedExpectedPriorityLevel := expected.DeepCopy()
flowcontrolapisv1beta1.SetObjectDefaults_PriorityLevelConfiguration(copiedExpectedPriorityLevel)
return !equality.Semantic.DeepEqual(copiedExpectedPriorityLevel.Spec, actual.Spec), nil
func removeFlowSchema(client flowcontrolclient.FlowSchemaInterface) error {
bootstrap := append(flowcontrolbootstrap.MandatoryFlowSchemas, flowcontrolbootstrap.SuggestedFlowSchemas...)
candidates, err := ensurer.GetFlowSchemaRemoveCandidate(client, bootstrap)
if err != nil {
return err
}
if len(candidates) == 0 {
return nil
}
fsRemover := ensurer.NewFlowSchemaRemover(client)
return fsRemover.Remove(candidates)
}
func removePriorityLevel(client flowcontrolclient.PriorityLevelConfigurationInterface) error {
bootstrap := append(flowcontrolbootstrap.MandatoryPriorityLevelConfigurations, flowcontrolbootstrap.SuggestedPriorityLevelConfigurations...)
candidates, err := ensurer.GetPriorityLevelRemoveCandidate(client, bootstrap)
if err != nil {
return err
}
if len(candidates) == 0 {
return nil
}
plRemover := ensurer.NewPriorityLevelRemover(client)
return plRemover.Remove(candidates)
}
// contextFromChannelAndMaxWaitDuration returns a Context that is bound to the
// specified channel and the wait duration. The derived context will be
// cancelled when the specified channel stopCh is closed or the maximum wait
// duration specified in maxWait elapses, whichever happens first.
//
// Note the caller must *always* call the CancelFunc, otherwise resources may be leaked.
func contextFromChannelAndMaxWaitDuration(stopCh <-chan struct{}, maxWait time.Duration) (context.Context, context.CancelFunc) {
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer cancel()
select {
case <-stopCh:
case <-time.After(maxWait):
// the caller can explicitly cancel the context which is an
// indication to us to exit the goroutine immediately.
// Note that we are calling cancel more than once when we are here,
// CancelFunc is idempotent and we expect no ripple effects here.
case <-ctx.Done():
}
}()
return ctx, cancel
}

View File

@ -19,14 +19,13 @@ package rest
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
flowcontrolv1beta1 "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap"
"k8s.io/client-go/kubernetes/fake"
flowcontrolapisv1beta1 "k8s.io/kubernetes/pkg/apis/flowcontrol/v1beta1"
)
func TestShouldEnsurePredefinedSettings(t *testing.T) {
@ -53,121 +52,40 @@ func TestShouldEnsurePredefinedSettings(t *testing.T) {
if testCase.existingPriorityLevel != nil {
c.FlowcontrolV1beta1().PriorityLevelConfigurations().Create(context.TODO(), testCase.existingPriorityLevel, metav1.CreateOptions{})
}
should, err := shouldEnsureSuggested(c.FlowcontrolV1beta1())
should, err := shouldCreateSuggested(c.FlowcontrolV1beta1())
assert.NoError(t, err)
assert.Equal(t, testCase.expected, should)
})
}
}
func TestFlowSchemaHasWrongSpec(t *testing.T) {
fs1 := &flowcontrolv1beta1.FlowSchema{
Spec: flowcontrolv1beta1.FlowSchemaSpec{},
func TestContextFromChannelAndMaxWaitDurationWithChannelClosed(t *testing.T) {
stopCh := make(chan struct{})
ctx, cancel := contextFromChannelAndMaxWaitDuration(stopCh, time.Hour)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("Expected the derived context to be not cancelled, but got: %v", ctx.Err())
default:
}
fs2 := &flowcontrolv1beta1.FlowSchema{
Spec: flowcontrolv1beta1.FlowSchemaSpec{
MatchingPrecedence: 1,
},
}
fs1Defaulted := &flowcontrolv1beta1.FlowSchema{
Spec: flowcontrolv1beta1.FlowSchemaSpec{
MatchingPrecedence: flowcontrolapisv1beta1.FlowSchemaDefaultMatchingPrecedence,
},
}
testCases := []struct {
name string
expected *flowcontrolv1beta1.FlowSchema
actual *flowcontrolv1beta1.FlowSchema
hasWrongSpec bool
}{
{
name: "identical flow-schemas should work",
expected: bootstrap.MandatoryFlowSchemaCatchAll,
actual: bootstrap.MandatoryFlowSchemaCatchAll,
hasWrongSpec: false,
},
{
name: "defaulted flow-schemas should work",
expected: fs1,
actual: fs1Defaulted,
hasWrongSpec: false,
},
{
name: "non-defaulted flow-schema has wrong spec",
expected: fs1,
actual: fs2,
hasWrongSpec: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
w, err := flowSchemaHasWrongSpec(testCase.expected, testCase.actual)
require.NoError(t, err)
assert.Equal(t, testCase.hasWrongSpec, w)
})
close(stopCh)
<-ctx.Done()
if ctx.Err() != context.Canceled {
t.Errorf("Expected the context to be canceled with: %v, but got: %v", context.Canceled, ctx.Err())
}
}
func TestPriorityLevelHasWrongSpec(t *testing.T) {
pl1 := &flowcontrolv1beta1.PriorityLevelConfiguration{
Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1beta1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
LimitResponse: flowcontrolv1beta1.LimitResponse{
Type: flowcontrolv1beta1.LimitResponseTypeReject,
},
},
},
}
pl2 := &flowcontrolv1beta1.PriorityLevelConfiguration{
Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1beta1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: 1,
},
},
}
pl1Defaulted := &flowcontrolv1beta1.PriorityLevelConfiguration{
Spec: flowcontrolv1beta1.PriorityLevelConfigurationSpec{
Type: flowcontrolv1beta1.PriorityLevelEnablementLimited,
Limited: &flowcontrolv1beta1.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: flowcontrolapisv1beta1.PriorityLevelConfigurationDefaultAssuredConcurrencyShares,
LimitResponse: flowcontrolv1beta1.LimitResponse{
Type: flowcontrolv1beta1.LimitResponseTypeReject,
},
},
},
}
testCases := []struct {
name string
expected *flowcontrolv1beta1.PriorityLevelConfiguration
actual *flowcontrolv1beta1.PriorityLevelConfiguration
hasWrongSpec bool
}{
{
name: "identical priority-level should work",
expected: bootstrap.MandatoryPriorityLevelConfigurationCatchAll,
actual: bootstrap.MandatoryPriorityLevelConfigurationCatchAll,
hasWrongSpec: false,
},
{
name: "defaulted priority-level should work",
expected: pl1,
actual: pl1Defaulted,
hasWrongSpec: false,
},
{
name: "non-defaulted priority-level has wrong spec",
expected: pl1,
actual: pl2,
hasWrongSpec: true,
},
}
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
w, err := priorityLevelHasWrongSpec(testCase.expected, testCase.actual)
require.NoError(t, err)
assert.Equal(t, testCase.hasWrongSpec, w)
})
func TestContextFromChannelAndMaxWaitDurationWithMaxWaitElapsed(t *testing.T) {
stopCh := make(chan struct{})
ctx, cancel := contextFromChannelAndMaxWaitDuration(stopCh, 100*time.Millisecond)
defer cancel()
<-ctx.Done()
if ctx.Err() != context.Canceled {
t.Errorf("Expected the context to be canceled with: %v, but got: %v", context.Canceled, ctx.Err())
}
}

View File

@ -57,6 +57,50 @@ const (
ResponseHeaderMatchedFlowSchemaUID = "X-Kubernetes-PF-FlowSchema-UID"
)
const (
// AutoUpdateAnnotationKey is the name of an annotation that enables
// automatic update of the spec of the bootstrap configuration
// object(s), if set to 'true'.
//
// On a fresh install, all bootstrap configuration objects will have auto
// update enabled with the following annotation key:
// apf.kubernetes.io/autoupdate-spec: 'true'
//
// The kube-apiserver periodically checks the bootstrap configuration
// objects on the cluster and applies updates if necessary.
//
// kube-apiserver enforces an 'always auto-update' policy for the
// mandatory configuration object(s). This implies:
// - the auto-update annotation key is added with a value of 'true'
// if it is missing.
// - the auto-update annotation key is set to 'true' if its current value
// is a boolean false or has an invalid boolean representation
// (if the cluster operator sets it to 'false' it will be stomped)
// - any changes to the spec made by the cluster operator will be
// stomped.
//
// The kube-apiserver will apply updates on the suggested configuration if:
// - the cluster operator has enabled auto-update by setting the annotation
// (apf.kubernetes.io/autoupdate-spec: 'true') or
// - the annotation key is missing but the generation is 1
//
// If the suggested configuration object is missing the annotation key,
// kube-apiserver will update the annotation appropriately:
// - it is set to 'true' if generation of the object is '1' which usually
// indicates that the spec of the object has not been changed.
// - it is set to 'false' if generation of the object is greater than 1.
//
// The goal is to enable the kube-apiserver to apply update on suggested
// configuration objects installed by previous releases but not overwrite
// changes made by the cluster operators.
// Note that this distinction is imperfectly detected: in the case where an
// operator deletes a suggested configuration object and later creates it
// but with a variant spec and then does no updates of the object
// (generation is 1), the technique outlined above will incorrectly
// determine that the object should be auto-updated.
AutoUpdateAnnotationKey = "apf.kubernetes.io/autoupdate-spec"
)
// +genclient
// +genclient:nonNamespaced
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

View File

@ -455,8 +455,14 @@ var (
func newPriorityLevelConfiguration(name string, spec flowcontrol.PriorityLevelConfigurationSpec) *flowcontrol.PriorityLevelConfiguration {
return &flowcontrol.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{Name: name},
Spec: spec}
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
flowcontrol.AutoUpdateAnnotationKey: "true",
},
},
Spec: spec,
}
}
func newFlowSchema(name, plName string, matchingPrecedence int32, dmType flowcontrol.FlowDistinguisherMethodType, rules ...flowcontrol.PolicyRulesWithSubjects) *flowcontrol.FlowSchema {
@ -465,7 +471,12 @@ func newFlowSchema(name, plName string, matchingPrecedence int32, dmType flowcon
dm = &flowcontrol.FlowDistinguisherMethod{Type: dmType}
}
return &flowcontrol.FlowSchema{
ObjectMeta: metav1.ObjectMeta{Name: name},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Annotations: map[string]string{
flowcontrol.AutoUpdateAnnotationKey: "true",
},
},
Spec: flowcontrol.FlowSchemaSpec{
PriorityLevelConfiguration: flowcontrol.PriorityLevelConfigurationReference{
Name: plName,