mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 19:01:49 +00:00
Merge pull request #113136 from jprzychodzen/kcm-remove-race-condition
NodeLifecycleController: Remove race condition
This commit is contained in:
commit
245d59273c
@ -1127,9 +1127,14 @@ func RemoveTaintOffNode(ctx context.Context, c clientset.Interface, nodeName str
|
||||
|
||||
// PatchNodeTaints patches node's taints.
|
||||
func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
|
||||
oldData, err := json.Marshal(oldNode)
|
||||
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
|
||||
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
|
||||
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
|
||||
oldNodeNoRV := oldNode.DeepCopy()
|
||||
oldNodeNoRV.ResourceVersion = ""
|
||||
oldDataNoRV, err := json.Marshal(&oldNodeNoRV)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
|
||||
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err)
|
||||
}
|
||||
|
||||
newTaints := newNode.Spec.Taints
|
||||
@ -1140,7 +1145,7 @@ func PatchNodeTaints(ctx context.Context, c clientset.Interface, nodeName string
|
||||
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
|
||||
}
|
||||
|
@ -989,6 +989,43 @@ func TestAddOrUpdateTaintOnNode(t *testing.T) {
|
||||
},
|
||||
requestCount: 1,
|
||||
},
|
||||
{
|
||||
name: "add taint to changed node",
|
||||
nodeHandler: &testutil.FakeNodeHandler{
|
||||
Existing: []*v1.Node{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
ResourceVersion: "1",
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Taints: []v1.Taint{
|
||||
{Key: "key1", Value: "value1", Effect: "NoSchedule"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
|
||||
AsyncCalls: []func(*testutil.FakeNodeHandler){func(m *testutil.FakeNodeHandler) {
|
||||
if len(m.UpdatedNodes) == 0 {
|
||||
m.UpdatedNodes = append(m.UpdatedNodes, &v1.Node{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "node1",
|
||||
ResourceVersion: "2",
|
||||
},
|
||||
Spec: v1.NodeSpec{
|
||||
Taints: []v1.Taint{},
|
||||
}})
|
||||
}
|
||||
}},
|
||||
},
|
||||
nodeName: "node1",
|
||||
taintsToAdd: []*v1.Taint{{Key: "key2", Value: "value2", Effect: "NoExecute"}},
|
||||
expectedTaints: []v1.Taint{
|
||||
{Key: "key2", Value: "value2", Effect: "NoExecute"},
|
||||
},
|
||||
requestCount: 5,
|
||||
},
|
||||
}
|
||||
for _, test := range tests {
|
||||
err := AddOrUpdateTaintOnNode(context.TODO(), test.nodeHandler, test.nodeName, test.taintsToAdd...)
|
||||
|
@ -54,7 +54,7 @@ const (
|
||||
testNodeMonitorGracePeriod = 40 * time.Second
|
||||
testNodeStartupGracePeriod = 60 * time.Second
|
||||
testNodeMonitorPeriod = 5 * time.Second
|
||||
testRateLimiterQPS = float32(10000)
|
||||
testRateLimiterQPS = float32(100000)
|
||||
testLargeClusterThreshold = 20
|
||||
testUnhealthyThreshold = float32(0.55)
|
||||
)
|
||||
|
@ -31,6 +31,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||
@ -64,6 +65,7 @@ type FakeNodeHandler struct {
|
||||
// Input: Hooks determine if request is valid or not
|
||||
CreateHook func(*FakeNodeHandler, *v1.Node) bool
|
||||
Existing []*v1.Node
|
||||
AsyncCalls []func(*FakeNodeHandler)
|
||||
|
||||
// Output
|
||||
CreatedNodes []*v1.Node
|
||||
@ -131,10 +133,11 @@ func (m *FakeNodeHandler) Create(_ context.Context, node *v1.Node, _ metav1.Crea
|
||||
}
|
||||
|
||||
// Get returns a Node from the fake store.
|
||||
func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
|
||||
func (m *FakeNodeHandler) Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.Node, error) {
|
||||
m.lock.Lock()
|
||||
defer func() {
|
||||
m.RequestCount++
|
||||
m.runAsyncCalls()
|
||||
m.lock.Unlock()
|
||||
}()
|
||||
for i := range m.UpdatedNodes {
|
||||
@ -152,6 +155,12 @@ func (m *FakeNodeHandler) Get(_ context.Context, name string, opts metav1.GetOpt
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (m *FakeNodeHandler) runAsyncCalls() {
|
||||
for _, a := range m.AsyncCalls {
|
||||
a(m)
|
||||
}
|
||||
}
|
||||
|
||||
// List returns a list of Nodes from the fake store.
|
||||
func (m *FakeNodeHandler) List(_ context.Context, opts metav1.ListOptions) (*v1.NodeList, error) {
|
||||
m.lock.Lock()
|
||||
@ -212,6 +221,9 @@ func (m *FakeNodeHandler) Update(_ context.Context, node *v1.Node, _ metav1.Upda
|
||||
nodeCopy := *node
|
||||
for i, updateNode := range m.UpdatedNodes {
|
||||
if updateNode.Name == nodeCopy.Name {
|
||||
if updateNode.GetObjectMeta().GetResourceVersion() != nodeCopy.GetObjectMeta().GetResourceVersion() {
|
||||
return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
|
||||
}
|
||||
m.UpdatedNodes[i] = &nodeCopy
|
||||
return node, nil
|
||||
}
|
||||
@ -345,6 +357,9 @@ func (m *FakeNodeHandler) Patch(_ context.Context, name string, pt types.PatchTy
|
||||
if updatedNodeIndex < 0 {
|
||||
m.UpdatedNodes = append(m.UpdatedNodes, &updatedNode)
|
||||
} else {
|
||||
if updatedNode.GetObjectMeta().GetResourceVersion() != m.UpdatedNodes[updatedNodeIndex].GetObjectMeta().GetResourceVersion() {
|
||||
return nil, apierrors.NewConflict(schema.GroupResource{}, "fake conflict", nil)
|
||||
}
|
||||
m.UpdatedNodes[updatedNodeIndex] = &updatedNode
|
||||
}
|
||||
|
||||
|
@ -89,9 +89,14 @@ func AddOrUpdateTaintOnNode(c clientset.Interface, nodeName string, taints ...*v
|
||||
|
||||
// PatchNodeTaints patches node's taints.
|
||||
func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, newNode *v1.Node) error {
|
||||
oldData, err := json.Marshal(oldNode)
|
||||
// Strip base diff node from RV to ensure that our Patch request will set RV to check for conflicts over .spec.taints.
|
||||
// This is needed because .spec.taints does not specify patchMergeKey and patchStrategy and adding them is no longer an option for compatibility reasons.
|
||||
// Using other Patch strategy works for adding new taints, however will not resolve problem with taint removal.
|
||||
oldNodeNoRV := oldNode.DeepCopy()
|
||||
oldNodeNoRV.ResourceVersion = ""
|
||||
oldDataNoRV, err := json.Marshal(&oldNodeNoRV)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNode, nodeName, err)
|
||||
return fmt.Errorf("failed to marshal old node %#v for node %q: %v", oldNodeNoRV, nodeName, err)
|
||||
}
|
||||
|
||||
newTaints := newNode.Spec.Taints
|
||||
@ -102,7 +107,7 @@ func PatchNodeTaints(c clientset.Interface, nodeName string, oldNode *v1.Node, n
|
||||
return fmt.Errorf("failed to marshal new node %#v for node %q: %v", newNodeClone, nodeName, err)
|
||||
}
|
||||
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
|
||||
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldDataNoRV, newData, v1.Node{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create patch for node %q: %v", nodeName, err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user