Merge pull request #13642 from gmarek/nodecontroller_race

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2015-09-07 13:01:59 -07:00
commit bb3e20e361
4 changed files with 33 additions and 22 deletions

View File

@ -20,6 +20,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
@ -88,7 +89,9 @@ type NodeController struct {
// to aviod the problem with time skew across the cluster. // to aviod the problem with time skew across the cluster.
nodeStatusMap map[string]nodeStatusData nodeStatusMap map[string]nodeStatusData
now func() util.Time now func() util.Time
// worker that evicts pods from unresponsive nodes. // Lock to access evictor workers
evictorLock *sync.Mutex
// workers that evicts pods from unresponsive nodes.
podEvictor *RateLimitedTimedQueue podEvictor *RateLimitedTimedQueue
terminationEvictor *RateLimitedTimedQueue terminationEvictor *RateLimitedTimedQueue
podEvictionTimeout time.Duration podEvictionTimeout time.Duration
@ -120,6 +123,7 @@ func NewNodeController(
if allocateNodeCIDRs && clusterCIDR == nil { if allocateNodeCIDRs && clusterCIDR == nil {
glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.") glog.Fatal("NodeController: Must specify clusterCIDR if allocateNodeCIDRs == true.")
} }
evictorLock := sync.Mutex{}
return &NodeController{ return &NodeController{
cloud: cloud, cloud: cloud,
knownNodeSet: make(util.StringSet), knownNodeSet: make(util.StringSet),
@ -127,8 +131,9 @@ func NewNodeController(
recorder: recorder, recorder: recorder,
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
maximumGracePeriod: 5 * time.Minute, maximumGracePeriod: 5 * time.Minute,
podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false), evictorLock: &evictorLock,
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter, false), podEvictor: NewRateLimitedTimedQueue(podEvictionLimiter),
terminationEvictor: NewRateLimitedTimedQueue(podEvictionLimiter),
nodeStatusMap: make(map[string]nodeStatusData), nodeStatusMap: make(map[string]nodeStatusData),
nodeMonitorGracePeriod: nodeMonitorGracePeriod, nodeMonitorGracePeriod: nodeMonitorGracePeriod,
nodeMonitorPeriod: nodeMonitorPeriod, nodeMonitorPeriod: nodeMonitorPeriod,
@ -162,6 +167,8 @@ func (nc *NodeController) Run(period time.Duration) {
// c. If there are pods still terminating, wait for their estimated completion // c. If there are pods still terminating, wait for their estimated completion
// before retrying // before retrying
go util.Until(func() { go util.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) { nc.podEvictor.Try(func(value TimedValue) (bool, time.Duration) {
remaining, err := nc.deletePods(value.Value) remaining, err := nc.deletePods(value.Value)
if err != nil { if err != nil {
@ -178,6 +185,8 @@ func (nc *NodeController) Run(period time.Duration) {
// TODO: replace with a controller that ensures pods that are terminating complete // TODO: replace with a controller that ensures pods that are terminating complete
// in a particular time period // in a particular time period
go util.Until(func() { go util.Until(func() {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) { nc.terminationEvictor.Try(func(value TimedValue) (bool, time.Duration) {
completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt) completed, remaining, err := nc.terminatePods(value.Value, value.AddedAt)
if err != nil { if err != nil {
@ -551,12 +560,17 @@ func (nc *NodeController) hasPods(nodeName string) (bool, error) {
// evictPods queues an eviction for the provided node name, and returns false if the node is already // evictPods queues an eviction for the provided node name, and returns false if the node is already
// queued for eviction. // queued for eviction.
func (nc *NodeController) evictPods(nodeName string) bool { func (nc *NodeController) evictPods(nodeName string) bool {
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
return nc.podEvictor.Add(nodeName) return nc.podEvictor.Add(nodeName)
} }
// cancelPodEviction removes any queued evictions, typically because the node is available again. It // cancelPodEviction removes any queued evictions, typically because the node is available again. It
// returns true if an eviction was queued. // returns true if an eviction was queued.
func (nc *NodeController) cancelPodEviction(nodeName string) bool { func (nc *NodeController) cancelPodEviction(nodeName string) bool {
glog.V(2).Infof("Cancelling pod Eviction on Node: %v", nodeName)
nc.evictorLock.Lock()
defer nc.evictorLock.Unlock()
wasDeleting := nc.podEvictor.Remove(nodeName) wasDeleting := nc.podEvictor.Remove(nodeName)
wasTerminating := nc.terminationEvictor.Remove(nodeName) wasTerminating := nc.terminationEvictor.Remove(nodeName)
return wasDeleting || wasTerminating return wasDeleting || wasTerminating

View File

@ -136,19 +136,16 @@ func (q *UniqueQueue) Head() (TimedValue, bool) {
type RateLimitedTimedQueue struct { type RateLimitedTimedQueue struct {
queue UniqueQueue queue UniqueQueue
limiter util.RateLimiter limiter util.RateLimiter
leak bool
} }
// Creates new queue which will use given RateLimiter to oversee execution. If leak is true, // Creates new queue which will use given RateLimiter to oversee execution.
// items which are rate limited will be leakped. Otherwise, rate limited items will be requeued. func NewRateLimitedTimedQueue(limiter util.RateLimiter) *RateLimitedTimedQueue {
func NewRateLimitedTimedQueue(limiter util.RateLimiter, leak bool) *RateLimitedTimedQueue {
return &RateLimitedTimedQueue{ return &RateLimitedTimedQueue{
queue: UniqueQueue{ queue: UniqueQueue{
queue: TimedQueue{}, queue: TimedQueue{},
set: util.NewStringSet(), set: util.NewStringSet(),
}, },
limiter: limiter, limiter: limiter,
leak: leak,
} }
} }
@ -164,12 +161,9 @@ func (q *RateLimitedTimedQueue) Try(fn ActionFunc) {
val, ok := q.queue.Head() val, ok := q.queue.Head()
for ok { for ok {
// rate limit the queue checking // rate limit the queue checking
if q.leak { if !q.limiter.CanAccept() {
if !q.limiter.CanAccept() { // Try again later
break break
}
} else {
q.limiter.Accept()
} }
now := now() now := now()

View File

@ -38,7 +38,7 @@ func CheckSetEq(lhs, rhs util.StringSet) bool {
} }
func TestAddNode(t *testing.T) { func TestAddNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -61,7 +61,7 @@ func TestAddNode(t *testing.T) {
} }
func TestDelNode(t *testing.T) { func TestDelNode(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -83,7 +83,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
} }
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -105,7 +105,7 @@ func TestDelNode(t *testing.T) {
t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern) t.Errorf("Invalid map. Got %v, expected %v", evictor.queue.set, setPattern)
} }
evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) evictor = NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -129,7 +129,7 @@ func TestDelNode(t *testing.T) {
} }
func TestTry(t *testing.T) { func TestTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), true) evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -151,7 +151,7 @@ func TestTry(t *testing.T) {
} }
func TestTryOrdering(t *testing.T) { func TestTryOrdering(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false) evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")
@ -183,7 +183,7 @@ func TestTryOrdering(t *testing.T) {
} }
func TestTryRemovingWhileTry(t *testing.T) { func TestTryRemovingWhileTry(t *testing.T) {
evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter(), false) evictor := NewRateLimitedTimedQueue(util.NewFakeRateLimiter())
evictor.Add("first") evictor.Add("first")
evictor.Add("second") evictor.Add("second")
evictor.Add("third") evictor.Add("third")

View File

@ -522,10 +522,13 @@ var _ = Describe("Nodes", func() {
By(fmt.Sprintf("block network traffic from node %s", node.Name)) By(fmt.Sprintf("block network traffic from node %s", node.Name))
performTemporaryNetworkFailure(c, ns, name, replicas, pods.Items[0].Name, node) performTemporaryNetworkFailure(c, ns, name, replicas, pods.Items[0].Name, node)
Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name) Logf("Waiting %v for node %s to be ready once temporary network failure ends", resizeNodeReadyTimeout, node.Name)
if !waitForNodeToBe(c, node.Name, true, resizeNodeReadyTimeout) { if !waitForNodeToBeReady(c, node.Name, resizeNodeReadyTimeout) {
Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout) Failf("Node %s did not become ready within %v", node.Name, resizeNodeReadyTimeout)
} }
// sleep a bit, to allow Watch in NodeController to catch up.
time.Sleep(5 * time.Second)
By("verify whether new pods can be created on the re-attached node") By("verify whether new pods can be created on the re-attached node")
// increasing the RC size is not a valid way to test this // increasing the RC size is not a valid way to test this
// since we have no guarantees the pod will be scheduled on our node. // since we have no guarantees the pod will be scheduled on our node.