mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
NodeLifecycleController: Remove race condition
Patch request does not support RV by default, we need to include them explicitly and patching lists actually overwrites whole field. It means that there is a race condition, in which we can overwrite changes to taints that happened between GET and PATCH requests.
This commit is contained in:
parent
83415e5c9e
commit
de25c5fdcf
@ -1127,9 +1127,14 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str
|
|||||||
|
|
||||||
// PatchNodeTaints patches node's taints.
|
// PatchNodeTaints patches node's taints.
|
||||||
func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
|
func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
|
||||||
oldData, err := json.Marshal(oldNode)
|
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
|
||||||
|
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
|
||||||
|
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
|
||||||
|
oldNodeNoRV := oldNode.DeepCopy()
|
||||||
|
oldNodeNoRV.ResourceVersion = ""
|
||||||
|
oldDataNoRV, err := json.Marshal(&oldNodeNoRV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
|
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
newTaints := newNode.Spec.Taints
|
newTaints := newNode.Spec.Taints
|
||||||
@ -1140,7 +1145,7 @@ func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string
|
|||||||
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
|
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
|
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
|
||||||
}
|
}
|
||||||
|
@ -989,6 +989,43 @@ func TestAddOrUpdateTaintOnNode(t *testing.T) {
|
|||||||
},
|
},
|
||||||
requestCount: 1,
|
requestCount: 1,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "add taint to changed node",
|
||||||
|
nodeHandler: &testutil.FakeNodeHandler{
|
||||||
|
Existing: []*v1.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "node1",
|
||||||
|
ResourceVersion: "1",
|
||||||
|
},
|
||||||
|
Spec: v1.NodeSpec{
|
||||||
|
Taints: []v1.Taint{
|
||||||
|
{Key: "key1", Value: "value1", Effect: "NoSchedule"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
|
||||||
|
AsyncCalls: []func(*testutil.FakeNodeHandler){func(m *testutil.FakeNodeHandler) {
|
||||||
|
if len(m.UpdatedNodes) == 0 {
|
||||||
|
m.UpdatedNodes = append(m.UpdatedNodes, &v1.Node{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "node1",
|
||||||
|
ResourceVersion: "2",
|
||||||
|
},
|
||||||
|
Spec: v1.NodeSpec{
|
||||||
|
Taints: []v1.Taint{},
|
||||||
|
}})
|
||||||
|
}
|
||||||
|
}},
|
||||||
|
},
|
||||||
|
nodeName: "node1",
|
||||||
|
taintsToAdd: []*v1.Taint{{Key: "key2", Value: "value2", Effect: "NoExecute"}},
|
||||||
|
expectedTaints: []v1.Taint{
|
||||||
|
{Key: "key2", Value: "value2", Effect: "NoExecute"},
|
||||||
|
},
|
||||||
|
requestCount: 5,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
err := AddOrUpdateTaintOnNode(context.TODO(), test.nodeHandler, test.nodeName, test.taintsToAdd...)
|
err := AddOrUpdateTaintOnNode(context.TODO(), test.nodeHandler, test.nodeName, test.taintsToAdd...)
|
||||||
|
@ -54,7 +54,7 @@ const (
|
|||||||
testNodeMonitorGracePeriod = 40 * time.Second
|
testNodeMonitorGracePeriod = 40 * time.Second
|
||||||
testNodeStartupGracePeriod = 60 * time.Second
|
testNodeStartupGracePeriod = 60 * time.Second
|
||||||
testNodeMonitorPeriod = 5 * time.Second
|
testNodeMonitorPeriod = 5 * time.Second
|
||||||
testRateLimiterQPS = float32(10000)
|
testRateLimiterQPS = float32(100000)
|
||||||
testLargeClusterThreshold = 20
|
testLargeClusterThreshold = 20
|
||||||
testUnhealthyThreshold = float32(0.55)
|
testUnhealthyThreshold = float32(0.55)
|
||||||
)
|
)
|
||||||
|
@ -31,6 +31,7 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
@ -64,6 +65,7 @@ type FakeNodeHandler struct {
|
|||||||
// Input: Hooks determine if request is valid or not
|
// Input: Hooks determine if request is valid or not
|
||||||
CreateHook func(*FakeNodeHandler, *v1.Node) bool
|
CreateHook func(*FakeNodeHandler, *v1.Node) bool
|
||||||
Existing []*v1.Node
|
Existing []*v1.Node
|
||||||
|
AsyncCalls []func(*FakeNodeHandler)
|
||||||
|
|
||||||
// Output
|
// Output
|
||||||
CreatedNodes []*v1.Node
|
CreatedNodes []*v1.Node
|
||||||
@ -131,10 +133,11 @@ func (m *FakeNodeHandler) Create(_ context.Context, node *v1.Node, _ metav1.Crea
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Get returns a Node from the fake store.
|
// Get returns a Node from the fake store.
|
||||||
func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
|
func (m *FakeNodeHandler) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
defer func() {
|
defer func() {
|
||||||
m.RequestCount++
|
m.RequestCount++
|
||||||
|
m.runAsyncCalls()
|
||||||
m.lock.Unlock()
|
m.lock.Unlock()
|
||||||
}()
|
}()
|
||||||
for i := range m.UpdatedNodes {
|
for i := range m.UpdatedNodes {
|
||||||
@ -152,6 +155,12 @@ func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOpt
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *FakeNodeHandler) runAsyncCalls() {
|
||||||
|
for _, a := range m.AsyncCalls {
|
||||||
|
a(m)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// List returns a list of Nodes from the fake store.
|
// List returns a list of Nodes from the fake store.
|
||||||
func (m *FakeNodeHandler) List(_ context.Context, opts metav1.ListOptions) (*v1.NodeList, error) {
|
func (m *FakeNodeHandler) List(_ context.Context, opts metav1.ListOptions) (*v1.NodeList, error) {
|
||||||
m.lock.Lock()
|
m.lock.Lock()
|
||||||
@ -212,6 +221,9 @@ func (m *FakeNodeHandler) Update(_ context.Context, node *v1.Node, _ metav1.Upda
|
|||||||
nodeCopy := *node
|
nodeCopy := *node
|
||||||
for i, updateNode := range m.UpdatedNodes {
|
for i, updateNode := range m.UpdatedNodes {
|
||||||
if updateNode.Name == nodeCopy.Name {
|
if updateNode.Name == nodeCopy.Name {
|
||||||
|
if updateNode.GetObjectMeta().GetResourceVersion() != nodeCopy.GetObjectMeta().GetResourceVersion() {
|
||||||
|
return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
|
||||||
|
}
|
||||||
m.UpdatedNodes[i] = &nodeCopy
|
m.UpdatedNodes[i] = &nodeCopy
|
||||||
return node, nil
|
return node, nil
|
||||||
}
|
}
|
||||||
@ -345,6 +357,9 @@ func (m *FakeNodeHandler) Patch(_ context.Context, name string, pt types.PatchTy
|
|||||||
if updatedNodeIndex < 0 {
|
if updatedNodeIndex < 0 {
|
||||||
m.UpdatedNodes = append(m.UpdatedNodes, &updatedNode)
|
m.UpdatedNodes = append(m.UpdatedNodes, &updatedNode)
|
||||||
} else {
|
} else {
|
||||||
|
if updatedNode.GetObjectMeta().GetResourceVersion() != m.UpdatedNodes[updatedNodeIndex].GetObjectMeta().GetResourceVersion() {
|
||||||
|
return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
|
||||||
|
}
|
||||||
m.UpdatedNodes[updatedNodeIndex] = &updatedNode
|
m.UpdatedNodes[updatedNodeIndex] = &updatedNode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -89,9 +89,14 @@ func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v
|
|||||||
|
|
||||||
// PatchNodeTaints patches node's taints.
|
// PatchNodeTaints patches node's taints.
|
||||||
func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
|
func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
|
||||||
oldData, err := json.Marshal(oldNode)
|
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
|
||||||
|
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
|
||||||
|
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
|
||||||
|
oldNodeNoRV := oldNode.DeepCopy()
|
||||||
|
oldNodeNoRV.ResourceVersion = ""
|
||||||
|
oldDataNoRV, err := json.Marshal(&oldNodeNoRV)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
|
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
newTaints := newNode.Spec.Taints
|
newTaints := newNode.Spec.Taints
|
||||||
@ -102,7 +107,7 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
|
|||||||
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
|
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
|
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
|
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user