Merge pull request #12962 from smarterclayton/evict_terminating_pods

Handle terminating pods in the node controller (6/7)
This commit is contained in:
Yu-Ju Hong 2015-08-25 14:32:24 -07:00
commit 29cabbe2f2
8 changed files with 528 additions and 210 deletions

View File

@ -191,7 +191,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
// TODO: Write an integration test for the replication controllers watch.
go controllerManager.Run(3, util.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()),
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)
nodeController.Run(5 * time.Second)
cadvisorInterface := new(cadvisor.Fake)

View File

@ -199,7 +199,7 @@ func (s *CMServer) Run(_ []string) error {
}
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, &s.ClusterCIDR, s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

View File

@ -123,7 +123,7 @@ func (s *CMServer) Run(_ []string) error {
}
nodeController := nodecontroller.NewNodeController(cloud, kubeClient,
s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)),
s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst),
s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs)
nodeController.Run(s.NodeSyncPeriod)

View File

@ -89,8 +89,11 @@ type NodeController struct {
nodeStatusMap map[string]nodeStatusData
now func() util.Time
// worker that evicts pods from unresponsive nodes.
podEvictor *PodEvictor
podEvictor *RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue
podEvictionTimeout time.Duration
// The maximum duration before a pod evicted from a node can be forcefully terminated.
maximumGracePeriod time.Duration
recorder record.EventRecorder
}
@ -99,7 +102,7 @@ func NewNodeController(
cloud cloudprovider.Interface,
kubeClient client.Interface,
podEvictionTimeout time.Duration,
podEvictor *PodEvictor,
podEvictionLimiter util.RateLimiter,
nodeMonitorGracePeriod time.Duration,
nodeStartupGracePeriod time.Duration,
nodeMonitorPeriod time.Duration,
@ -123,7 +126,9 @@ func NewNodeController(
kubeClient: kubeClient,
recorder: recorder,
podEvictionTimeout: podEvictionTimeout,
podEvictor: podEvictor,
maximumGracePeriod: 5 * time.Minute,
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false),
nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod,
@ -144,39 +149,56 @@ func (nc *NodeController) Run(period time.Duration) {
}
}, nc.nodeMonitorPeriod, util.NeverStop)
// Managing eviction of nodes:
// 1. when we delete pods off a node, if the node was not empty at the time we then
// queue a termination watcher
// a. If we hit an error, retry deletion
// 2. The terminator loop ensures that pods are eventually cleaned and we never
// terminate a pod in a time period less than nc.maximumGracePeriod. AddedAt
// is the time from which we measure "has this pod been terminating too long",
// after which we will delete the pod with grace period 0 (force delete).
// a. If we hit errors, retry instantly
// b. If there are no pods left terminating, exit
// c. If there are pods still terminating, wait for their estimated completion
// before retrying
go util.Until(func() {
nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) })
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.deletePods(value.Value)
if err != nil {
util.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0
}
if remaining {
nc.terminationEvictor.Add(value.Value)
}
return true, 0
})
}, nodeEvictionPeriod, util.NeverStop)
}
// We observed a Node deletion in etcd. Currently we only need to remove Pods that
// were assigned to it.
func (nc *NodeController) deleteNode(nodeID string) error {
return nc.deletePods(nodeID)
}
// TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period
go util.Until(func() {
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
if err != nil {
util.HandleError(fmt.Errorf("unable to terminate pods on node %q: %v", value.Value, err))
return false, 0
}
// deletePods will delete all pods from master running on given node.
func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeID))
if err != nil {
return err
}
nc.recordNodeEvent(nodeID, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeID))
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeID {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
}
}
if completed {
glog.Infof("All pods terminated on %s", value.Value)
nc.recordNodeEvent(value.Value, "TerminatedAllPods", fmt.Sprintf("Terminated all Pods on Node %s.", value.Value))
return true, 0
}
return nil
glog.V(2).Infof("Pods terminating since %s on %q, estimated completion %s", value.AddedAt, value.Value, remaining)
// clamp very short intervals
if remaining < nodeEvictionPeriod {
remaining = nodeEvictionPeriod
}
return false, remaining
})
}, nodeEvictionPeriod, util.NeverStop)
}
// Generates num pod CIDRs that could be assigned to nodes.
@ -218,6 +240,7 @@ func (nc *NodeController) monitorNodeStatus() error {
if !nc.knownNodeSet.Has(node.Name) {
glog.V(1).Infof("NodeController observed a new Node: %#v", node)
nc.recordNodeEvent(node.Name, "RegisteredNode", fmt.Sprintf("Registered Node %v in NodeController", node.Name))
nc.cancelPodEviction(node.Name)
nc.knownNodeSet.Insert(node.Name)
}
}
@ -229,11 +252,11 @@ func (nc *NodeController) monitorNodeStatus() error {
observedSet.Insert(node.Name)
}
deleted := nc.knownNodeSet.Difference(observedSet)
for node := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", node)
nc.recordNodeEvent(node, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", node))
nc.deleteNode(node)
nc.knownNodeSet.Delete(node)
for nodeName := range deleted {
glog.V(1).Infof("NodeController observed a Node deletion: %v", nodeName)
nc.recordNodeEvent(nodeName, "RemovingNode", fmt.Sprintf("Removing Node %v from NodeController", nodeName))
nc.evictPods(nodeName)
nc.knownNodeSet.Delete(nodeName)
}
}
@ -274,19 +297,19 @@ func (nc *NodeController) monitorNodeStatus() error {
// Check eviction timeout against decisionTimestamp
if lastReadyCondition.Status == api.ConditionFalse &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
if nc.podEvictor.AddNodeToEvict(node.Name) {
glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
if nc.evictPods(node.Name) {
glog.Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout)
}
}
if lastReadyCondition.Status == api.ConditionUnknown &&
decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) {
if nc.podEvictor.AddNodeToEvict(node.Name) {
glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
if nc.evictPods(node.Name) {
glog.Infof("Evicting pods on node %s: %v is later than %v + %v", node.Name, decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod)
}
}
if lastReadyCondition.Status == api.ConditionTrue {
if nc.podEvictor.RemoveNodeToEvict(node.Name) {
glog.Infof("Pods on %v won't be evicted", node.Name)
if nc.cancelPodEviction(node.Name) {
glog.Infof("Node %s is ready again, cancelled pod eviction", node.Name)
}
}
@ -305,11 +328,20 @@ func (nc *NodeController) monitorNodeStatus() error {
}
if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound {
glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name)
nc.recordNodeEvent(node.Name, "DeleteingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
if err := nc.deletePods(node.Name); err != nil {
glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err)
nc.recordNodeEvent(node.Name, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name))
remaining, err := nc.hasPods(node.Name)
if err != nil {
glog.Errorf("Unable to determine whether node %s has pods, will retry: %v", node.Name, err)
continue
}
if remaining {
// queue eviction of the pods on the node
glog.V(2).Infof("Deleting node %s is delayed while pods are evicted", node.Name)
nc.evictPods(node.Name)
continue
}
if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil {
glog.Errorf("Unable to delete node %s: %v", node.Name, err)
continue
@ -505,3 +537,115 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap
return gracePeriod, lastReadyCondition, readyCondition, err
}
// returns true if the provided node still has pods scheduled to it, or an error if
// the server could not be contacted.
func (nc *NodeController) hasPods(nodeName string) (bool, error) {
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeName))
if err != nil {
return false, err
}
return len(pods.Items) > 0, nil
}
// evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool {
return nc.podEvictor.Add(nodeName)
}
// cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(nodeName string) bool {
wasDeleting := nc.podEvictor.Remove(nodeName)
wasTerminating := nc.terminationEvictor.Remove(nodeName)
return wasDeleting || wasTerminating
}
// deletePods will delete all pods from master running on given node, and return true
// if any pods were deleted.
func (nc *NodeController) deletePods(nodeName string) (bool, error) {
remaining := false
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.OneTermEqualSelector(client.PodHost, nodeName))
if err != nil {
return remaining, err
}
if len(pods.Items) > 0 {
nc.recordNodeEvent(nodeName, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
}
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// if the pod has already been deleted, ignore it
if pod.DeletionGracePeriodSeconds != nil {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeName)
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err
}
remaining = true
}
return remaining, nil
}
// terminatePods will ensure all pods on the given node that are in terminating state are eventually
// cleaned up. Returns true if the node has no pods in terminating state, a duration that indicates how
// long before we should check again (the next deadline for a pod to complete), or an error.
func (nc *NodeController) terminatePods(nodeName string, since time.Time) (bool, time.Duration, error) {
// the time before we should try again
nextAttempt := time.Duration(0)
// have we deleted all pods
complete := true
pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(),
fields.OneTermEqualSelector(client.PodHost, nodeName))
if err != nil {
return false, nextAttempt, err
}
now := time.Now()
elapsed := now.Sub(since)
for _, pod := range pods.Items {
// Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName {
continue
}
// only clean terminated pods
if pod.DeletionGracePeriodSeconds == nil {
continue
}
// the user's requested grace period
grace := time.Duration(*pod.DeletionGracePeriodSeconds) * time.Second
if grace > nc.maximumGracePeriod {
grace = nc.maximumGracePeriod
}
// the time remaining before the pod should have been deleted
remaining := grace - elapsed
if remaining < 0 {
remaining = 0
glog.V(2).Infof("Removing pod %v after %s grace period", pod.Name, grace)
nc.recordNodeEvent(nodeName, "TerminatingEvictedPod", fmt.Sprintf("Pod %s has exceeded the grace period for deletion after being evicted from Node %q and is being force killed", pod.Name, nodeName))
if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, api.NewDeleteOptions(0)); err != nil {
glog.Errorf("Error completing deletion of pod %s: %v", pod.Name, err)
complete = false
}
} else {
glog.V(2).Infof("Pod %v still terminating, requested grace period %s, %s remaining", pod.Name, grace, remaining)
complete = false
}
if nextAttempt < remaining {
nextAttempt = remaining
}
}
return complete, nextAttempt, nil
}

View File

@ -324,9 +324,8 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
}
for _, item := range table {
podEvictor := NewPodEvictor(util.NewFakeRateLimiter())
nodeController := NewNodeController(nil, item.fakeNodeHandler,
evictionTimeout, podEvictor, testNodeMonitorGracePeriod,
evictionTimeout, util.NewFakeRateLimiter(), testNodeMonitorGracePeriod,
testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@ -340,7 +339,17 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
t.Errorf("unexpected error: %v", err)
}
podEvictor.TryEvict(func(nodeName string) { nodeController.deletePods(nodeName) })
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, _ := nodeController.deletePods(value.Value)
if remaining {
nodeController.terminationEvictor.Add(value.Value)
}
return true, 0
})
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.terminatePods(value.Value, value.AddedAt)
return true, 0
})
podEvicted := false
for _, action := range item.fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource() == "pods" {
@ -531,7 +540,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}
for _, item := range table {
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@ -610,7 +619,7 @@ func TestNodeDeletion(t *testing.T) {
Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}),
}
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()),
nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, util.NewFakeRateLimiter(),
testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.monitorNodeStatus(); err != nil {
@ -620,6 +629,10 @@ func TestNodeDeletion(t *testing.T) {
if err := nodeController.monitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeController.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
nodeController.deletePods(value.Value)
return true, 0
})
podEvicted := false
for _, action := range fakeNodeHandler.Actions() {
if action.GetVerb() == "delete" && action.GetResource() == "pods" {

View File

@ -1,129 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodecontroller
import (
"sync"
"k8s.io/kubernetes/pkg/util"
"github.com/golang/glog"
)
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue []string
set util.StringSet
}
// Entity responsible for evicting Pods from inserted Nodes. It uses RateLimiter to avoid
// evicting everything at once. Note that we rate limit eviction of Nodes not individual Pods.
type PodEvictor struct {
queue UniqueQueue
deletingPodsRateLimiter util.RateLimiter
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
func (q *UniqueQueue) Add(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
if !q.set.Has(value) {
q.queue = append(q.queue, value)
q.set.Insert(value)
return true
} else {
return false
}
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
for i, val := range q.queue {
if val == value {
if i > 0 && i < len(q.queue)-1 {
q.queue = append(q.queue[0:i], q.queue[i+1:len(q.queue)]...)
} else if i > 0 {
q.queue = q.queue[0 : len(q.queue)-1]
} else {
q.queue = q.queue[1:len(q.queue)]
}
return true
}
}
return false
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (string, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
return "", false
}
result := q.queue[0]
q.queue = q.queue[1:len(q.queue)]
return result, true
}
// Creates new PodEvictor which will use given RateLimiter to oversee eviction.
func NewPodEvictor(deletingPodsRateLimiter util.RateLimiter) *PodEvictor {
return &PodEvictor{
queue: UniqueQueue{
queue: make([]string, 0),
set: util.NewStringSet(),
},
deletingPodsRateLimiter: deletingPodsRateLimiter,
}
}
// Tries to evict all Pods from previously inserted Nodes. Ends prematurely if RateLimiter forbids any eviction.
// Each Node is processed only once, as long as it's not Removed, i.e. calling multiple AddNodeToEvict does not result
// with multiple evictions as long as RemoveNodeToEvict is not called.
func (pe *PodEvictor) TryEvict(delFunc func(string)) {
val, ok := pe.queue.Get()
for ok {
if pe.deletingPodsRateLimiter.CanAccept() {
glog.Infof("PodEvictor is evicting Pods on Node: %v", val)
delFunc(val)
} else {
glog.V(1).Info("PodEvictor is rate limitted.")
break
}
val, ok = pe.queue.Get()
}
}
// Adds Node to the Evictor to be processed later. Won't add the same Node second time if it was already
// added and not removed.
func (pe *PodEvictor) AddNodeToEvict(nodeName string) bool {
return pe.queue.Add(nodeName)
}
// Removes Node from the Evictor. The Node won't be processed until added again.
func (pe *PodEvictor) RemoveNodeToEvict(nodeName string) bool {
return pe.queue.Remove(nodeName)
}

View File

@ -0,0 +1,207 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package nodecontroller
import (
"container/heap"
"sync"
"time"
"k8s.io/kubernetes/pkg/util"
)
// TimedValue is a value that should be processed at a designated time.
type TimedValue struct {
Value string
AddedAt time.Time
ProcessAt time.Time
}
// now is used to test time
var now func() time.Time = time.Now
// TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
type TimedQueue []*TimedValue
func (h TimedQueue) Len() int { return len(h) }
func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *TimedQueue) Push(x interface{}) {
*h = append(*h, x.(*TimedValue))
}
func (h *TimedQueue) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
// A FIFO queue which additionally guarantees that any element can be added only once until
// it is removed.
type UniqueQueue struct {
lock sync.Mutex
queue TimedQueue
set util.StringSet
}
// Adds a new value to the queue if it wasn't added before, or was explicitly removed by the
// Remove call. Returns true if new value was added.
func (q *UniqueQueue) Add(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
if q.set.Has(value.Value) {
return false
}
heap.Push(&q.queue, &value)
q.set.Insert(value.Value)
return true
}
// Replace replaces an existing value in the queue if it already exists, otherwise it does nothing.
// Returns true if the item was found.
func (q *UniqueQueue) Replace(value TimedValue) bool {
q.lock.Lock()
defer q.lock.Unlock()
for i := range q.queue {
if q.queue[i].Value != value.Value {
continue
}
heap.Remove(&q.queue, i)
heap.Push(&q.queue, &value)
return true
}
return false
}
// Removes the value from the queue, so Get() call won't return it, and allow subsequent addition
// of the given value. If the value is not present does nothing and returns false.
func (q *UniqueQueue) Remove(value string) bool {
q.lock.Lock()
defer q.lock.Unlock()
q.set.Delete(value)
for i, val := range q.queue {
if val.Value == value {
heap.Remove(&q.queue, i)
return true
}
}
return false
}
// Returns the oldest added value that wasn't returned yet.
func (q *UniqueQueue) Get() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
return TimedValue{}, false
}
result := heap.Pop(&q.queue).(*TimedValue)
q.set.Delete(result.Value)
return *result, true
}
// Head returns the oldest added value that wasn't returned yet without removing it.
func (q *UniqueQueue) Head() (TimedValue, bool) {
q.lock.Lock()
defer q.lock.Unlock()
if len(q.queue) == 0 {
return TimedValue{}, false
}
result := q.queue[0]
return *result, true
}
// RateLimitedTimedQueue is a unique item priority queue ordered by the expected next time
// of execution. It is also rate limited.
type RateLimitedTimedQueue struct {
queue UniqueQueue
limiter util.RateLimiter
leak bool
}
// Creates new queue which will use given RateLimiter to oversee execution. If leak is true,
// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued.
func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{
queue: UniqueQueue{
queue: TimedQueue{},
set: util.NewStringSet(),
},
limiter: limiter,
leak: leak,
}
}
// ActionFunc takes a timed value and returns false if the item must be retried, with an optional
// time.Duration if some minimum wait interval should be used.
type ActionFunc func(TimedValue) (bool, time.Duration)
// Try processes the queue. Ends prematurely if RateLimiter forbids an action and leak is true.
// Otherwise, requeues the item to be processed. Each value is processed once if fn returns true,
// otherwise it is added back to the queue. The returned remaining is used to identify the minimum
// time to execute the next item in the queue.
func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head()
for ok {
// rate limit the queue checking
if q.leak {
if !q.limiter.CanAccept() {
break
}
} else {
q.limiter.Accept()
}
now := now()
if now.Before(val.ProcessAt) {
q.queue.Replace(val)
val, ok = q.queue.Head()
// we do not sleep here because other values may be added at the front of the queue
continue
}
if ok, wait := fn(val); !ok {
val.ProcessAt = now.Add(wait + 1)
q.queue.Replace(val)
} else {
q.queue.Remove(val.Value)
}
val, ok = q.queue.Head()
}
}
// Adds value to the queue to be processed. Won't add the same value a second time if it was already
// added and not removed.
func (q *RateLimitedTimedQueue) Add(value string) bool {
now := now()
return q.queue.Add(TimedValue{
Value: value,
AddedAt: now,
ProcessAt: now,
})
}
// Removes Node from the Evictor. The Node won't be processed until added again.
func (q *RateLimitedTimedQueue) Remove(value string) bool {
return q.queue.Remove(value)
}

View File

@ -17,14 +17,16 @@ limitations under the License.
package nodecontroller
import (
"reflect"
"testing"
"time"
"k8s.io/kubernetes/pkg/util"
)
func CheckQueueEq(lhs, rhs []string) bool {
func CheckQueueEq(lhs []string, rhs TimedQueue) bool {
for i := 0; i < len(lhs); i++ {
if rhs[i] != lhs[i] {
if rhs[i].Value != lhs[i] {
return false
}
}
@ -36,10 +38,10 @@ func CheckSetEq(lhs, rhs util.StringSet) bool {
}
func TestAddNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
queuePattern := []string{"first", "second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@ -59,11 +61,11 @@ func TestAddNode(t *testing.T) {
}
func TestDelNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("first")
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("first")
queuePattern := []string{"second", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@ -81,11 +83,11 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("second")
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("second")
queuePattern = []string{"first", "third"}
if len(evictor.queue.queue) != len(queuePattern) {
@ -103,11 +105,11 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
}
evictor = NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("third")
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("third")
queuePattern = []string{"first", "second"}
if len(evictor.queue.queue) != len(queuePattern) {
@ -126,15 +128,18 @@ func TestDelNode(t *testing.T) {
}
}
func TestEvictNode(t *testing.T) {
evictor := NewPodEvictor(util.NewFakeRateLimiter())
evictor.AddNodeToEvict("first")
evictor.AddNodeToEvict("second")
evictor.AddNodeToEvict("third")
evictor.RemoveNodeToEvict("second")
func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
evictor.Remove("second")
deletedMap := util.NewStringSet()
evictor.TryEvict(func(nodeName string) { deletedMap.Insert(nodeName) })
evictor.Try(func(value TimedValue) (bool, time.Duration) {
deletedMap.Insert(value.Value)
return true, 0
})
setPattern := util.NewStringSet("first", "third")
if len(deletedMap) != len(setPattern) {
@ -144,3 +149,81 @@ func TestEvictNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", deletedMap, setPattern)
}
}
func TestTryOrdering(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
order := []string{}
count := 0
queued := false
evictor.Try(func(value TimedValue) (bool, time.Duration) {
count++
if value.AddedAt.IsZero() {
t.Fatalf("added should not be zero")
}
if value.ProcessAt.IsZero() {
t.Fatalf("next should not be zero")
}
if !queued && value.Value == "second" {
queued = true
return false, time.Millisecond
}
order = append(order, value.Value)
return true, 0
})
if !reflect.DeepEqual(order, []string{"first", "third", "second"}) {
t.Fatalf("order was wrong: %v", order)
}
if count != 4 {
t.Fatalf("unexpected iterations: %d", count)
}
}
func TestTryRemovingWhileTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false)
evictor.Add("first")
evictor.Add("second")
evictor.Add("third")
processing := make(chan struct{})
wait := make(chan struct{})
order := []string{}
count := 0
queued := false
// while the Try function is processing "second", remove it from the queue
// we should not see "second" retried.
go func() {
<-processing
evictor.Remove("second")
close(wait)
}()
evictor.Try(func(value TimedValue) (bool, time.Duration) {
count++
if value.AddedAt.IsZero() {
t.Fatalf("added should not be zero")
}
if value.ProcessAt.IsZero() {
t.Fatalf("next should not be zero")
}
if !queued && value.Value == "second" {
queued = true
close(processing)
<-wait
return false, time.Millisecond
}
order = append(order, value.Value)
return true, 0
})
if !reflect.DeepEqual(order, []string{"first", "third"}) {
t.Fatalf("order was wrong: %v", order)
}
if count != 3 {
t.Fatalf("unexpected iterations: %d", count)
}
}