From de25c5fdcf91388f6b9e30c099e9131d985bcb32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Przychodze=C5=84?= Date: Tue, 18 Oct 2022 12:01:15 +0000 Subject: [PATCH] NodeLifecycleController: Remove race condition Patch request does not support RV by default, we need to include them explicitly and patching lists actually overwrites whole field. It means that there is a race condition, in which we can overwrite changes to taints that happened between GET and PATCH requests. --- pkg/controller/controller_utils.go | 11 ++++-- pkg/controller/controller_utils_test.go | 37 +++++++++++++++++++ .../node_lifecycle_controller_test.go | 2 +- pkg/controller/testutil/test_utils.go | 17 ++++++++- .../cloud-provider/node/helpers/taints.go | 11 ++++-- 5 files changed, 70 insertions(+), 8 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 241294cded6..5c897da432d 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -1127,9 +1127,14 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str // PatchNodeTaints patches node's taints. func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error { - oldData, err := json.Marshal(oldNode) + // Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints. + // This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons. + // Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal. + oldNodeNoRV := oldNode.DeepCopy() + oldNodeNoRV.ResourceVersion = "" + oldDataNoRV, err := json.Marshal(&oldNodeNoRV) if err != nil { - return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) + return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err) } newTaints := newNode.Spec.Taints @@ -1140,7 +1145,7 @@ func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{}) if err != nil { return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) } diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index ce57a010953..28474fd2f5f 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -989,6 +989,43 @@ func TestAddOrUpdateTaintOnNode(t *testing.T) { }, requestCount: 1, }, + { + name: "add taint to changed node", + nodeHandler: &testutil.FakeNodeHandler{ + Existing: []*v1.Node{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + ResourceVersion: "1", + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{ + {Key: "key1", Value: "value1", Effect: "NoSchedule"}, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}), + AsyncCalls: []func(*testutil.FakeNodeHandler){func(m *testutil.FakeNodeHandler) { + if len(m.UpdatedNodes) == 0 { + m.UpdatedNodes = append(m.UpdatedNodes, &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + ResourceVersion: "2", + }, + Spec: v1.NodeSpec{ + Taints: []v1.Taint{}, + }}) + } + }}, + }, + nodeName: "node1", + taintsToAdd: []*v1.Taint{{Key: "key2", Value: "value2", Effect: "NoExecute"}}, + expectedTaints: []v1.Taint{ + {Key: "key2", Value: "value2", Effect: "NoExecute"}, + }, + requestCount: 5, + }, } for _, test := range tests { err := AddOrUpdateTaintOnNode(context.TODO(), test.nodeHandler, test.nodeName, test.taintsToAdd...) diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index 41fc78cf0d3..0d87930049a 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -54,7 +54,7 @@ const ( testNodeMonitorGracePeriod = 40 * time.Second testNodeStartupGracePeriod = 60 * time.Second testNodeMonitorPeriod = 5 * time.Second - testRateLimiterQPS = float32(10000) + testRateLimiterQPS = float32(100000) testLargeClusterThreshold = 20 testUnhealthyThreshold = float32(0.55) ) diff --git a/pkg/controller/testutil/test_utils.go b/pkg/controller/testutil/test_utils.go index b29a4feb723..9973215bb2f 100644 --- a/pkg/controller/testutil/test_utils.go +++ b/pkg/controller/testutil/test_utils.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/strategicpatch" @@ -64,6 +65,7 @@ type FakeNodeHandler struct { // Input: Hooks determine if request is valid or not CreateHook func(*FakeNodeHandler, *v1.Node) bool Existing []*v1.Node + AsyncCalls []func(*FakeNodeHandler) // Output CreatedNodes []*v1.Node @@ -131,10 +133,11 @@ func (m *FakeNodeHandler) Create(_ context.Context, node *v1.Node, _ metav1.Crea } // Get returns a Node from the fake store. -func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) { +func (m *FakeNodeHandler) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) { m.lock.Lock() defer func() { m.RequestCount++ + m.runAsyncCalls() m.lock.Unlock() }() for i := range m.UpdatedNodes { @@ -152,6 +155,12 @@ func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOpt return nil, nil } +func (m *FakeNodeHandler) runAsyncCalls() { + for _, a := range m.AsyncCalls { + a(m) + } +} + // List returns a list of Nodes from the fake store. func (m *FakeNodeHandler) List(_ context.Context, opts metav1.ListOptions) (*v1.NodeList, error) { m.lock.Lock() @@ -212,6 +221,9 @@ func (m *FakeNodeHandler) Update(_ context.Context, node *v1.Node, _ metav1.Upda nodeCopy := *node for i, updateNode := range m.UpdatedNodes { if updateNode.Name == nodeCopy.Name { + if updateNode.GetObjectMeta().GetResourceVersion() != nodeCopy.GetObjectMeta().GetResourceVersion() { + return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil) + } m.UpdatedNodes[i] = &nodeCopy return node, nil } @@ -345,6 +357,9 @@ func (m *FakeNodeHandler) Patch(_ context.Context, name string, pt types.PatchTy if updatedNodeIndex < 0 { m.UpdatedNodes = append(m.UpdatedNodes, &updatedNode) } else { + if updatedNode.GetObjectMeta().GetResourceVersion() != m.UpdatedNodes[updatedNodeIndex].GetObjectMeta().GetResourceVersion() { + return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil) + } m.UpdatedNodes[updatedNodeIndex] = &updatedNode } diff --git a/staging/src/k8s.io/cloud-provider/node/helpers/taints.go b/staging/src/k8s.io/cloud-provider/node/helpers/taints.go index ca6d27336a3..fb15d64bd24 100644 --- a/staging/src/k8s.io/cloud-provider/node/helpers/taints.go +++ b/staging/src/k8s.io/cloud-provider/node/helpers/taints.go @@ -89,9 +89,14 @@ func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v // PatchNodeTaints patches node's taints. func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error { - oldData, err := json.Marshal(oldNode) + // Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints. + // This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons. + // Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal. + oldNodeNoRV := oldNode.DeepCopy() + oldNodeNoRV.ResourceVersion = "" + oldDataNoRV, err := json.Marshal(&oldNodeNoRV) if err != nil { - return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err) + return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err) } newTaints := newNode.Spec.Taints @@ -102,7 +107,7 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{}) if err != nil { return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err) }