diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index f63dea29efe..596606ec517 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -149,6 +149,18 @@ func (nc *NodeController) Run(period time.Duration) { } }, nc.nodeMonitorPeriod, util.NeverStop) + // Managing eviction of nodes: + // 1. when we delete pods off a node, if the node was not empty at the time we then + // queue a termination watcher + // a. If we hit an error, retry deletion + // 2. The terminator loop ensures that pods are eventually cleaned and we never + // terminate a pod in a time period less than nc.maximumGracePeriod. AddedAt + // is the time from which we measure "has this pod been terminating too long", + // after which we will delete the pod with grace period 0 (force delete). + // a. If we hit errors, retry instantly + // b. If there are no pods left terminating, exit + // c. If there are pods still terminating, wait for their estimated completion + // before retrying go util.Until(func() { nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { remaining, err := nc.deletePods(value.Value) @@ -167,19 +179,19 @@ func (nc *NodeController) Run(period time.Duration) { // in a particular time period go util.Until(func() { nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { - completed, remaining, err := nc.terminatePods(value.Value, value.Added) + completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt) if err != nil { util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err)) return false, 0 } if completed { - glog.V(2).Infof("All pods terminated on %s", value.Value) + glog.Infof("All pods terminated on %s", value.Value) nc.recordNodeEvent(value.Value, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value)) return true, 0 } - glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.Added, value.Value, remaining) + glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining) // clamp very short intervals if remaining < nodeEvictionPeriod { remaining = nodeEvictionPeriod @@ -228,6 +240,7 @@ func (nc *NodeController) monitorNodeStatus() error { if !nc.knownNodeSet.Has(node.Name) { glog.V(1).Infof("NodeController observed a new Node: %#v", node) nc.recordNodeEvent(node.Name, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name)) + nc.cancelPodEviction(node.Name) nc.knownNodeSet.Insert(node.Name) } } @@ -239,11 +252,11 @@ func (nc *NodeController) monitorNodeStatus() error { observedSet.Insert(node.Name) } deleted := nc.knownNodeSet.Difference(observedSet) - for node := range deleted { - glog.V(1).Infof("NodeController observed a Node deletion: %v", node) - nc.recordNodeEvent(node, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", node)) - nc.podEvictor.Add(node) - nc.knownNodeSet.Delete(node) + for nodeName := range deleted { + glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName) + nc.recordNodeEvent(nodeName, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName)) + nc.evictPods(nodeName) + nc.knownNodeSet.Delete(nodeName) } } @@ -284,21 +297,19 @@ func (nc *NodeController) monitorNodeStatus() error { // Check eviction timeout against decisionTimestamp if lastReadyCondition.Status == api.ConditionFalse && decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { - if nc.podEvictor.Add(node.Name) { - glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) + if nc.evictPods(node.Name) { + glog.Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) } } if lastReadyCondition.Status == api.ConditionUnknown && decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) { - if nc.podEvictor.Add(node.Name) { - glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) + if nc.evictPods(node.Name) { + glog.Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) } } if lastReadyCondition.Status == api.ConditionTrue { - wasDeleting := nc.podEvictor.Remove(node.Name) - wasTerminating := nc.terminationEvictor.Remove(node.Name) - if wasDeleting || wasTerminating { - glog.Infof("Pods on %v won't be evicted", node.Name) + if nc.cancelPodEviction(node.Name) { + glog.Infof("Node %s is ready again, cancelled pod eviction", node.Name) } } @@ -326,8 +337,8 @@ func (nc *NodeController) monitorNodeStatus() error { } if remaining { // queue eviction of the pods on the node - glog.Infof("Deleting node %s is delayed while pods are evicted", node.Name) - nc.podEvictor.Add(node.Name) + glog.V(2).Infof("Deleting node %s is delayed while pods are evicted", node.Name) + nc.evictPods(node.Name) continue } @@ -529,30 +540,44 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap // returns true if the provided node still has pods scheduled to it, or an error if // the server could not be contacted. -func (nc *NodeController) hasPods(nodeID string) (bool, error) { - pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeID)) +func (nc *NodeController) hasPods(nodeName string) (bool, error) { + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeName)) if err != nil { return false, err } return len(pods.Items) > 0, nil } +// evictPods queues an eviction for the provided node name, and returns false if the node is already +// queued for eviction. +func (nc *NodeController) evictPods(nodeName string) bool { + return nc.podEvictor.Add(nodeName) +} + +// cancelPodEviction removes any queued evictions, typically because the node is available again. It +// returns true if an eviction was queued. +func (nc *NodeController) cancelPodEviction(nodeName string) bool { + wasDeleting := nc.podEvictor.Remove(nodeName) + wasTerminating := nc.terminationEvictor.Remove(nodeName) + return wasDeleting || wasTerminating +} + // deletePods will delete all pods from master running on given node, and return true // if any pods were deleted. -func (nc *NodeController) deletePods(nodeID string) (bool, error) { +func (nc *NodeController) deletePods(nodeName string) (bool, error) { remaining := false - pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeID)) + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeName)) if err != nil { return remaining, err } if len(pods.Items) > 0 { - nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID)) + nc.recordNodeEvent(nodeName, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } for _, pod := range pods.Items { // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeID { + if pod.Spec.NodeName != nodeName { continue } // if the pod has already been deleted, ignore it @@ -561,7 +586,7 @@ func (nc *NodeController) deletePods(nodeID string) (bool, error) { } glog.V(2).Infof("Delete pod %v", pod.Name) - nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID) + nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeName) if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { return false, err } @@ -571,22 +596,25 @@ func (nc *NodeController) deletePods(nodeID string) (bool, error) { } // terminatePods will ensure all pods on the given node that are in terminating state are eventually -// cleaned up -func (nc *NodeController) terminatePods(nodeID string, since time.Time) (bool, time.Duration, error) { - remaining := time.Duration(0) +// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how +// long before we should check again (the next deadline for a pod to complete), or an error. +func (nc *NodeController) terminatePods(nodeName string, since time.Time) (bool, time.Duration, error) { + // the time before we should try again + nextAttempt := time.Duration(0) + // have we deleted all pods complete := true pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), - fields.OneTermEqualSelector(client.PodHost, nodeID)) + fields.OneTermEqualSelector(client.PodHost, nodeName)) if err != nil { - return false, remaining, err + return false, nextAttempt, err } now := time.Now() elapsed := now.Sub(since) for _, pod := range pods.Items { // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeID { + if pod.Spec.NodeName != nodeName { continue } // only clean terminated pods @@ -594,28 +622,30 @@ func (nc *NodeController) terminatePods(nodeID string, since time.Time) (bool, t continue } + // the user's requested grace period grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second if grace > nc.maximumGracePeriod { grace = nc.maximumGracePeriod } - next := grace - elapsed - if next < 0 { - next = 0 + // the time remaining before the pod should have been deleted + remaining := grace - elapsed + if remaining < 0 { + remaining = 0 glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace) - nc.recordNodeEvent(nodeID, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeID)) + nc.recordNodeEvent(nodeName, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName)) if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil { glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err) complete = false } } else { - glog.V(2).Infof("Pod %v still terminating with %s remaining", pod.Name, next) + glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining) complete = false } - if remaining < next { - remaining = next + if nextAttempt < remaining { + nextAttempt = remaining } } - return complete, remaining, nil + return complete, nextAttempt, nil } diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index b10168b314c..2b58083cba1 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -347,7 +347,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { return true, 0 }) nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { - nodeController.terminatePods(value.Value, value.Added) + nodeController.terminatePods(value.Value, value.AddedAt) return true, 0 }) podEvicted := false diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go index e7fc9cc7475..550b646941f 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/rate_limited_queue.go @@ -26,19 +26,19 @@ import ( // TimedValue is a value that should be processed at a designated time. type TimedValue struct { - Value string - Added time.Time - Next time.Time + Value string + AddedAt time.Time + ProcessAt time.Time } // now is used to test time var now func() time.Time = time.Now -// TimedQueue is a priority heap where the lowest Next is at the front of the queue +// TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue type TimedQueue []*TimedValue func (h TimedQueue) Len() int { return len(h) } -func (h TimedQueue) Less(i, j int) bool { return h[i].Next.Before(h[j].Next) } +func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) } func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *TimedQueue) Push(x interface{}) { @@ -75,6 +75,23 @@ func (q *UniqueQueue) Add(value TimedValue) bool { return true } +// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing. +// Returns true if the item was found. +func (q *UniqueQueue) Replace(value TimedValue) bool { + q.lock.Lock() + defer q.lock.Unlock() + + for i := range q.queue { + if q.queue[i].Value != value.Value { + continue + } + heap.Remove(&q.queue, i) + heap.Push(&q.queue, &value) + return true + } + return false +} + // Removes the value from the queue, so Get() call won't return it, and allow subsequent addition // of the given value. If the value is not present does nothing and returns false. func (q *UniqueQueue) Remove(value string) bool { @@ -103,6 +120,17 @@ func (q *UniqueQueue) Get() (TimedValue, bool) { return *result, true } +// Head returns the oldest added value that wasn't returned yet without removing it. +func (q *UniqueQueue) Head() (TimedValue, bool) { + q.lock.Lock() + defer q.lock.Unlock() + if len(q.queue) == 0 { + return TimedValue{}, false + } + result := q.queue[0] + return *result, true +} + // RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time // of execution. It is also rate limited. type RateLimitedTimedQueue struct { @@ -133,7 +161,7 @@ type ActionFunc func(TimedValue) (bool, time.Duration) // otherwise it is added back to the queue. The returned remaining is used to identify the minimum // time to execute the next item in the queue. func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { - val, ok := q.queue.Get() + val, ok := q.queue.Head() for ok { // rate limit the queue checking if q.leak { @@ -145,18 +173,20 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { } now := now() - if now.Before(val.Next) { - q.queue.Add(val) - val, ok = q.queue.Get() + if now.Before(val.ProcessAt) { + q.queue.Replace(val) + val, ok = q.queue.Head() // we do not sleep here because other values may be added at the front of the queue continue } if ok, wait := fn(val); !ok { - val.Next = now.Add(wait + 1) - q.queue.Add(val) + val.ProcessAt = now.Add(wait + 1) + q.queue.Replace(val) + } else { + q.queue.Remove(val.Value) } - val, ok = q.queue.Get() + val, ok = q.queue.Head() } } @@ -165,9 +195,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { func (q *RateLimitedTimedQueue) Add(value string) bool { now := now() return q.queue.Add(TimedValue{ - Value: value, - Added: now, - Next: now, + Value: value, + AddedAt: now, + ProcessAt: now, }) } diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index e6e8eecd654..a3a49905255 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -161,10 +161,10 @@ func TestTryOrdering(t *testing.T) { queued := false evictor.Try(func(value TimedValue) (bool, time.Duration) { count++ - if value.Added.IsZero() { + if value.AddedAt.IsZero() { t.Fatalf("added should not be zero") } - if value.Next.IsZero() { + if value.ProcessAt.IsZero() { t.Fatalf("next should not be zero") } if !queued && value.Value == "second" { @@ -181,3 +181,49 @@ func TestTryOrdering(t *testing.T) { t.Fatalf("unexpected iterations: %d", count) } } + +func TestTryRemovingWhileTry(t *testing.T) { + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false) + evictor.Add("first") + evictor.Add("second") + evictor.Add("third") + + processing := make(chan struct{}) + wait := make(chan struct{}) + order := []string{} + count := 0 + queued := false + + // while the Try function is processing "second", remove it from the queue + // we should not see "second" retried. + go func() { + <-processing + evictor.Remove("second") + close(wait) + }() + + evictor.Try(func(value TimedValue) (bool, time.Duration) { + count++ + if value.AddedAt.IsZero() { + t.Fatalf("added should not be zero") + } + if value.ProcessAt.IsZero() { + t.Fatalf("next should not be zero") + } + if !queued && value.Value == "second" { + queued = true + close(processing) + <-wait + return false, time.Millisecond + } + order = append(order, value.Value) + return true, 0 + }) + + if !reflect.DeepEqual(order, []string{"first", "third"}) { + t.Fatalf("order was wrong: %v", order) + } + if count != 3 { + t.Fatalf("unexpected iterations: %d", count) + } +}