mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #84445 from krzysied/node_controller_retry_fix
NodeLifecycleController - MarkPodsNotReady retry fix
This commit is contained in:
commit
5e33f3db5f
@ -70,6 +70,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library",
|
||||||
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
"//staging/src/k8s.io/client-go/informers:go_default_library",
|
||||||
|
@ -236,6 +236,8 @@ type Controller struct {
|
|||||||
// workers that are responsible for tainting nodes.
|
// workers that are responsible for tainting nodes.
|
||||||
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
|
zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
|
||||||
|
|
||||||
|
nodesToRetry sync.Map
|
||||||
|
|
||||||
zoneStates map[string]ZoneState
|
zoneStates map[string]ZoneState
|
||||||
|
|
||||||
daemonSetStore appsv1listers.DaemonSetLister
|
daemonSetStore appsv1listers.DaemonSetLister
|
||||||
@ -347,6 +349,7 @@ func NewNodeLifecycleController(
|
|||||||
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
|
||||||
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
|
zonePodEvictor: make(map[string]*scheduler.RateLimitedTimedQueue),
|
||||||
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
|
zoneNoExecuteTainter: make(map[string]*scheduler.RateLimitedTimedQueue),
|
||||||
|
nodesToRetry: sync.Map{},
|
||||||
zoneStates: make(map[string]ZoneState),
|
zoneStates: make(map[string]ZoneState),
|
||||||
podEvictionTimeout: podEvictionTimeout,
|
podEvictionTimeout: podEvictionTimeout,
|
||||||
evictionLimiterQPS: evictionLimiterQPS,
|
evictionLimiterQPS: evictionLimiterQPS,
|
||||||
@ -462,6 +465,10 @@ func NewNodeLifecycleController(
|
|||||||
nc.nodeUpdateQueue.Add(newNode.Name)
|
nc.nodeUpdateQueue.Add(newNode.Name)
|
||||||
return nil
|
return nil
|
||||||
}),
|
}),
|
||||||
|
DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
|
||||||
|
nc.nodesToRetry.Delete(node.Name)
|
||||||
|
return nil
|
||||||
|
}),
|
||||||
})
|
})
|
||||||
|
|
||||||
nc.leaseLister = leaseInformer.Lister()
|
nc.leaseLister = leaseInformer.Lister()
|
||||||
@ -780,25 +787,38 @@ func (nc *Controller) monitorNodeHealth() error {
|
|||||||
nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod)
|
nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Report node event.
|
_, needsRetry := nc.nodesToRetry.Load(node.Name)
|
||||||
if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
|
switch {
|
||||||
|
case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
|
||||||
|
// Report node event only once when status changed.
|
||||||
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
|
nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
|
||||||
pods, err := listPodsFromNode(nc.kubeClient, node.Name)
|
fallthrough
|
||||||
if err != nil {
|
case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to list pods from node %v: %v", node.Name, err))
|
if err := nc.markPodsNotReady(node.Name); err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
nc.nodesToRetry.Store(node.Name, struct{}{})
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil {
|
|
||||||
utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
nc.nodesToRetry.Delete(node.Name)
|
||||||
}
|
}
|
||||||
nc.handleDisruption(zoneToNodeConditions, nodes)
|
nc.handleDisruption(zoneToNodeConditions, nodes)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (nc *Controller) markPodsNotReady(nodeName string) error {
|
||||||
|
pods, err := listPodsFromNode(nc.kubeClient, nodeName)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("unable to list pods from node %v: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
|
||||||
|
return fmt.Errorf("unable to mark all pods NotReady on node %v: %v", nodeName, err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) {
|
func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) {
|
||||||
decisionTimestamp := nc.now()
|
decisionTimestamp := nc.now()
|
||||||
// Check eviction timeout against decisionTimestamp
|
// Check eviction timeout against decisionTimestamp
|
||||||
|
@ -30,6 +30,7 @@ import (
|
|||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/util/diff"
|
"k8s.io/apimachinery/pkg/util/diff"
|
||||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
@ -2389,6 +2390,149 @@ func TestMonitorNodeHealthMarkPodsNotReady(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMonitorNodeHealthMarkPodsNotReadyRetry(t *testing.T) {
|
||||||
|
type nodeIteration struct {
|
||||||
|
timeToPass time.Duration
|
||||||
|
newNodes []*v1.Node
|
||||||
|
}
|
||||||
|
timeNow := metav1.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
|
||||||
|
timePlusTwoMinutes := metav1.Date(2015, 1, 1, 12, 0, 2, 0, time.UTC)
|
||||||
|
makeNodes := func(status v1.ConditionStatus, lastHeartbeatTime, lastTransitionTime metav1.Time) []*v1.Node {
|
||||||
|
return []*v1.Node{
|
||||||
|
{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "node0",
|
||||||
|
CreationTimestamp: timeNow,
|
||||||
|
},
|
||||||
|
Status: v1.NodeStatus{
|
||||||
|
Conditions: []v1.NodeCondition{
|
||||||
|
{
|
||||||
|
Type: v1.NodeReady,
|
||||||
|
Status: status,
|
||||||
|
LastHeartbeatTime: lastHeartbeatTime,
|
||||||
|
LastTransitionTime: lastTransitionTime,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
table := []struct {
|
||||||
|
desc string
|
||||||
|
fakeNodeHandler *testutil.FakeNodeHandler
|
||||||
|
updateReactor func(action testcore.Action) (bool, runtime.Object, error)
|
||||||
|
nodeIterations []nodeIteration
|
||||||
|
expectedPodStatusUpdates int
|
||||||
|
}{
|
||||||
|
// Node created long time ago, with status updated by kubelet exceeds grace period.
|
||||||
|
// First monitorNodeHealth check will update pod status to NotReady.
|
||||||
|
// Second monitorNodeHealth check will do no updates (no retry).
|
||||||
|
{
|
||||||
|
desc: "successful pod status update, no retry required",
|
||||||
|
fakeNodeHandler: &testutil.FakeNodeHandler{
|
||||||
|
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
|
||||||
|
},
|
||||||
|
nodeIterations: []nodeIteration{
|
||||||
|
{
|
||||||
|
timeToPass: 0,
|
||||||
|
newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timeToPass: 1 * time.Minute,
|
||||||
|
newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timeToPass: 1 * time.Minute,
|
||||||
|
newNodes: makeNodes(v1.ConditionFalse, timePlusTwoMinutes, timePlusTwoMinutes),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPodStatusUpdates: 1,
|
||||||
|
},
|
||||||
|
// Node created long time ago, with status updated by kubelet exceeds grace period.
|
||||||
|
// First monitorNodeHealth check will fail to update pod status to NotReady.
|
||||||
|
// Second monitorNodeHealth check will update pod status to NotReady (retry).
|
||||||
|
{
|
||||||
|
desc: "unsuccessful pod status update, retry required",
|
||||||
|
fakeNodeHandler: &testutil.FakeNodeHandler{
|
||||||
|
Clientset: fake.NewSimpleClientset(&v1.PodList{Items: []v1.Pod{*testutil.NewPod("pod0", "node0")}}),
|
||||||
|
},
|
||||||
|
updateReactor: func() func(action testcore.Action) (bool, runtime.Object, error) {
|
||||||
|
i := 0
|
||||||
|
return func(action testcore.Action) (bool, runtime.Object, error) {
|
||||||
|
if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" {
|
||||||
|
i++
|
||||||
|
switch i {
|
||||||
|
case 1:
|
||||||
|
return true, nil, fmt.Errorf("fake error")
|
||||||
|
default:
|
||||||
|
return true, testutil.NewPod("pod0", "node0"), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil, fmt.Errorf("unsupported action")
|
||||||
|
}
|
||||||
|
}(),
|
||||||
|
nodeIterations: []nodeIteration{
|
||||||
|
{
|
||||||
|
timeToPass: 0,
|
||||||
|
newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timeToPass: 1 * time.Minute,
|
||||||
|
newNodes: makeNodes(v1.ConditionTrue, timeNow, timeNow),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
timeToPass: 1 * time.Minute,
|
||||||
|
newNodes: makeNodes(v1.ConditionFalse, timePlusTwoMinutes, timePlusTwoMinutes),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
expectedPodStatusUpdates: 2, // One failed and one retry.
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, item := range table {
|
||||||
|
t.Run(item.desc, func(t *testing.T) {
|
||||||
|
nodeController, _ := newNodeLifecycleControllerFromClient(
|
||||||
|
item.fakeNodeHandler,
|
||||||
|
5*time.Minute,
|
||||||
|
testRateLimiterQPS,
|
||||||
|
testRateLimiterQPS,
|
||||||
|
testLargeClusterThreshold,
|
||||||
|
testUnhealthyThreshold,
|
||||||
|
testNodeMonitorGracePeriod,
|
||||||
|
testNodeStartupGracePeriod,
|
||||||
|
testNodeMonitorPeriod,
|
||||||
|
false)
|
||||||
|
if item.updateReactor != nil {
|
||||||
|
item.fakeNodeHandler.Clientset.PrependReactor("update", "pods", item.updateReactor)
|
||||||
|
}
|
||||||
|
nodeController.now = func() metav1.Time { return timeNow }
|
||||||
|
nodeController.recorder = testutil.NewFakeRecorder()
|
||||||
|
nodeController.getPodsAssignedToNode = fakeGetPodsAssignedToNode(item.fakeNodeHandler.Clientset)
|
||||||
|
for _, itertion := range item.nodeIterations {
|
||||||
|
nodeController.now = func() metav1.Time { return metav1.Time{Time: timeNow.Add(itertion.timeToPass)} }
|
||||||
|
item.fakeNodeHandler.Existing = itertion.newNodes
|
||||||
|
if err := nodeController.syncNodeStore(item.fakeNodeHandler); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
if err := nodeController.monitorNodeHealth(); err != nil {
|
||||||
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
podStatusUpdates := 0
|
||||||
|
for _, action := range item.fakeNodeHandler.Actions() {
|
||||||
|
if action.GetVerb() == "update" && action.GetResource().Resource == "pods" && action.GetSubresource() == "status" {
|
||||||
|
podStatusUpdates++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if podStatusUpdates != item.expectedPodStatusUpdates {
|
||||||
|
t.Errorf("expect pod status updated to happen %d times, but got %d", item.expectedPodStatusUpdates, podStatusUpdates)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestApplyNoExecuteTaints, ensures we just have a NoExecute taint applied to node.
|
// TestApplyNoExecuteTaints, ensures we just have a NoExecute taint applied to node.
|
||||||
// NodeController is just responsible for enqueuing the node to tainting queue from which taint manager picks up
|
// NodeController is just responsible for enqueuing the node to tainting queue from which taint manager picks up
|
||||||
// and evicts the pods on the node.
|
// and evicts the pods on the node.
|
||||||
|
Loading…
Reference in New Issue
Block a user