From ecc640f1b40f14a894269e4b2ae6c80158626e93 Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 19 Sep 2022 14:03:16 -0400 Subject: [PATCH] apiserver: use SSA for apf configuration client --- .../pkg/util/flowcontrol/apf_controller.go | 47 +++++----- .../pkg/util/flowcontrol/patch_test.go | 88 ------------------- 2 files changed, 24 insertions(+), 111 deletions(-) delete mode 100644 staging/src/k8s.io/apiserver/pkg/util/flowcontrol/patch_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go index 5ce1b95169e..b25db08744f 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_controller.go @@ -20,7 +20,6 @@ import ( "context" "crypto/sha256" "encoding/binary" - "encoding/json" "errors" "fmt" "math" @@ -34,7 +33,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - apitypes "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" @@ -53,6 +51,7 @@ import ( "k8s.io/utils/clock" flowcontrol "k8s.io/api/flowcontrol/v1beta3" + flowcontrolapplyconfiguration "k8s.io/client-go/applyconfigurations/flowcontrol/v1beta3" flowcontrolclient "k8s.io/client-go/kubernetes/typed/flowcontrol/v1beta3" flowcontrollister "k8s.io/client-go/listers/flowcontrol/v1beta3" ) @@ -420,19 +419,12 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior // if we are going to issue an update, be sure we track every name we update so we know if we update it too often. currResult.updatedItems.Insert(fsu.flowSchema.Name) - patchBytes, err := makeFlowSchemaConditionPatch(fsu.condition) - if err != nil { - // should never happen because these conditions are created here and well formed - panic(fmt.Sprintf("Failed to json.Marshall(%#+v): %s", fsu.condition, err.Error())) - } if klogV := klog.V(4); klogV.Enabled() { klogV.Infof("%s writing Condition %s to FlowSchema %s, which had ResourceVersion=%s, because its previous value was %s, diff: %s", cfgCtlr.name, fsu.condition, fsu.flowSchema.Name, fsu.flowSchema.ResourceVersion, fcfmt.Fmt(fsu.oldValue), cmp.Diff(fsu.oldValue, fsu.condition)) } - fsIfc := cfgCtlr.flowcontrolClient.FlowSchemas() - patchOptions := metav1.PatchOptions{FieldManager: cfgCtlr.asFieldManager} - _, err = fsIfc.Patch(context.TODO(), fsu.flowSchema.Name, apitypes.StrategicMergePatchType, patchBytes, patchOptions, "status") - if err != nil { + + if err := apply(cfgCtlr.flowcontrolClient.FlowSchemas(), fsu, cfgCtlr.asFieldManager); err != nil { if apierrors.IsNotFound(err) { // This object has been deleted. A notification is coming // and nothing more needs to be done here. @@ -447,18 +439,27 @@ func (cfgCtlr *configController) digestConfigObjects(newPLs []*flowcontrol.Prior return suggestedDelay, utilerrors.NewAggregate(errs) } -// makeFlowSchemaConditionPatch takes in a condition and returns the patch status as a json. -func makeFlowSchemaConditionPatch(condition flowcontrol.FlowSchemaCondition) ([]byte, error) { - o := struct { - Status flowcontrol.FlowSchemaStatus `json:"status"` - }{ - Status: flowcontrol.FlowSchemaStatus{ - Conditions: []flowcontrol.FlowSchemaCondition{ - condition, - }, - }, - } - return json.Marshal(o) +func apply(client flowcontrolclient.FlowSchemaInterface, fsu fsStatusUpdate, asFieldManager string) error { + applyOptions := metav1.ApplyOptions{FieldManager: asFieldManager, Force: true} + + // the condition field in fsStatusUpdate holds the new condition we want to update. + // TODO: this will break when we have multiple conditions for a flowschema + _, err := client.ApplyStatus(context.TODO(), toFlowSchemaApplyConfiguration(fsu), applyOptions) + return err +} + +func toFlowSchemaApplyConfiguration(fsUpdate fsStatusUpdate) *flowcontrolapplyconfiguration.FlowSchemaApplyConfiguration { + condition := flowcontrolapplyconfiguration.FlowSchemaCondition(). + WithType(fsUpdate.condition.Type). + WithStatus(fsUpdate.condition.Status). + WithReason(fsUpdate.condition.Reason). + WithLastTransitionTime(fsUpdate.condition.LastTransitionTime). + WithMessage(fsUpdate.condition.Message) + + return flowcontrolapplyconfiguration.FlowSchema(fsUpdate.flowSchema.Name). + WithStatus(flowcontrolapplyconfiguration.FlowSchemaStatus(). + WithConditions(condition), + ) } // shouldDelayUpdate checks to see if a flowschema has been updated too often and returns true if a delay is needed. diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/patch_test.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/patch_test.go deleted file mode 100644 index 6bb034294ae..00000000000 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/patch_test.go +++ /dev/null @@ -1,88 +0,0 @@ -/* -Copyright 2021 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package flowcontrol - -import ( - "fmt" - "reflect" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - flowcontrol "k8s.io/api/flowcontrol/v1beta3" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func Test_configController_generatePatchBytes(t *testing.T) { - now := time.Now().UTC() - tests := []struct { - name string - condition flowcontrol.FlowSchemaCondition - want []byte - }{ - { - name: "check if only condition is parsed", - condition: flowcontrol.FlowSchemaCondition{ - Type: flowcontrol.FlowSchemaConditionDangling, - Status: flowcontrol.ConditionTrue, - Reason: "test reason", - Message: "test none", - LastTransitionTime: metav1.NewTime(now), - }, - want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test none"}]}}`, now.Format(time.RFC3339))), - }, - { - name: "check when message has double quotes", - condition: flowcontrol.FlowSchemaCondition{ - Type: flowcontrol.FlowSchemaConditionDangling, - Status: flowcontrol.ConditionTrue, - Reason: "test reason", - Message: `test ""none`, - LastTransitionTime: metav1.NewTime(now), - }, - want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test \"\"none"}]}}`, now.Format(time.RFC3339))), - }, - { - name: "check when message has a whitespace character that can be escaped", - condition: flowcontrol.FlowSchemaCondition{ - Type: flowcontrol.FlowSchemaConditionDangling, - Status: flowcontrol.ConditionTrue, - Reason: "test reason", - Message: "test \u0009\u0009none", - LastTransitionTime: metav1.NewTime(now), - }, - want: []byte(fmt.Sprintf(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":"%s","reason":"test reason","message":"test \t\tnone"}]}}`, now.Format(time.RFC3339))), - }, - { - name: "check when a few fields (message & lastTransitionTime) are missing", - condition: flowcontrol.FlowSchemaCondition{ - Type: flowcontrol.FlowSchemaConditionDangling, - Status: flowcontrol.ConditionTrue, - Reason: "test reason", - }, - want: []byte(`{"status":{"conditions":[{"type":"Dangling","status":"True","lastTransitionTime":null,"reason":"test reason"}]}}`), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got, _ := makeFlowSchemaConditionPatch(tt.condition) - if !reflect.DeepEqual(got, tt.want) { - t.Errorf("makeFlowSchemaConditionPatch() got = %s, want %s; diff is %s", got, tt.want, cmp.Diff(tt.want, got)) - } - }) - } -}