diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index 596606ec517..2da83b9ab3d 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "net" + "sync" "time" "github.com/golang/glog" @@ -88,7 +89,9 @@ type NodeController struct { // to aviod the problem with time skew across the cluster. nodeStatusMap map[string]nodeStatusData now func() util.Time - // worker that evicts pods from unresponsive nodes. + // Lock to access evictor workers + evictorLock *sync.Mutex + // workers that evicts pods from unresponsive nodes. podEvictor *RateLimitedTimedQueue terminationEvictor *RateLimitedTimedQueue podEvictionTimeout time.Duration @@ -120,6 +123,7 @@ func NewNodeController( if allocateNodeCIDRs && clusterCIDR == nil { glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") } + evictorLock := sync.Mutex{} return &NodeController{ cloud: cloud, knownNodeSet: make(util.StringSet), @@ -127,8 +131,9 @@ func NewNodeController( recorder: recorder, podEvictionTimeout: podEvictionTimeout, maximumGracePeriod: 5 * time.Minute, - podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false), - terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false), + evictorLock: &evictorLock, + podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter), + terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter), nodeStatusMap: make(map[string]nodeStatusData), nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorPeriod: nodeMonitorPeriod, @@ -162,6 +167,8 @@ func (nc *NodeController) Run(period time.Duration) { // c. If there are pods still terminating, wait for their estimated completion // before retrying go util.Until(func() { + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { remaining, err := nc.deletePods(value.Value) if err != nil { @@ -178,6 +185,8 @@ func (nc *NodeController) Run(period time.Duration) { // TODO: replace with a controller that ensures pods that are terminating complete // in a particular time period go util.Until(func() { + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt) if err != nil { @@ -551,12 +560,17 @@ func (nc *NodeController) hasPods(nodeName string) (bool, error) { // evictPods queues an eviction for the provided node name, and returns false if the node is already // queued for eviction. func (nc *NodeController) evictPods(nodeName string) bool { + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() return nc.podEvictor.Add(nodeName) } // cancelPodEviction removes any queued evictions, typically because the node is available again. It // returns true if an eviction was queued. func (nc *NodeController) cancelPodEviction(nodeName string) bool { + glog.V(2).Infof("Cancelling pod Eviction on Node: %v", nodeName) + nc.evictorLock.Lock() + defer nc.evictorLock.Unlock() wasDeleting := nc.podEvictor.Remove(nodeName) wasTerminating := nc.terminationEvictor.Remove(nodeName) return wasDeleting || wasTerminating diff --git a/pkg/controller/node/rate_limited_queue.go b/pkg/controller/node/rate_limited_queue.go index 550b646941f..2fcd0963d1b 100644 --- a/pkg/controller/node/rate_limited_queue.go +++ b/pkg/controller/node/rate_limited_queue.go @@ -136,19 +136,16 @@ func (q *UniqueQueue) Head() (TimedValue, bool) { type RateLimitedTimedQueue struct { queue UniqueQueue limiter util.RateLimiter - leak bool } -// Creates new queue which will use given RateLimiter to oversee execution. If leak is true, -// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued. -func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue { +// Creates new queue which will use given RateLimiter to oversee execution. +func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue { return &RateLimitedTimedQueue{ queue: UniqueQueue{ queue: TimedQueue{}, set: util.NewStringSet(), }, limiter: limiter, - leak: leak, } } @@ -164,12 +161,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) { val, ok := q.queue.Head() for ok { // rate limit the queue checking - if q.leak { - if !q.limiter.CanAccept() { - break - } - } else { - q.limiter.Accept() + if !q.limiter.CanAccept() { + // Try again later + break } now := now() diff --git a/pkg/controller/node/rate_limited_queue_test.go b/pkg/controller/node/rate_limited_queue_test.go index a3a49905255..5dc8a4d81a4 100644 --- a/pkg/controller/node/rate_limited_queue_test.go +++ b/pkg/controller/node/rate_limited_queue_test.go @@ -38,7 +38,7 @@ func CheckSetEq(lhs, rhs util.StringSet) bool { } func TestAddNode(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -61,7 +61,7 @@ func TestAddNode(t *testing.T) { } func TestDelNode(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -83,7 +83,7 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -105,7 +105,7 @@ func TestDelNode(t *testing.T) { t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) } - evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -129,7 +129,7 @@ func TestDelNode(t *testing.T) { } func TestTry(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -151,7 +151,7 @@ func TestTry(t *testing.T) { } func TestTryOrdering(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false) + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") @@ -183,7 +183,7 @@ func TestTryOrdering(t *testing.T) { } func TestTryRemovingWhileTry(t *testing.T) { - evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false) + evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter()) evictor.Add("first") evictor.Add("second") evictor.Add("third") diff --git a/test/e2e/resize_nodes.go b/test/e2e/resize_nodes.go index 7ae33f1ad75..c3674726a01 100644 --- a/test/e2e/resize_nodes.go +++ b/test/e2e/resize_nodes.go @@ -522,10 +522,13 @@ var _ = Describe("Nodes", func() { By(fmt.Sprintf("block network traffic from node %s", node.Name)) performTemporaryNetworkFailure(c, ns, name, replicas, pods.Items[0].Name, node) Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name) - if !waitForNodeToBe(c, node.Name, true, resizeNodeReadyTimeout) { + if !waitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) { Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) } + // sleep a bit, to allow Watch in NodeController to catch up. + time.Sleep(5 * time.Second) + By("verify whether new pods can be created on the re-attached node") // increasing the RC size is not a valid way to test this // since we have no guarantees the pod will be scheduled on our node.