diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index b7f4c15ddbd..a90ddf87987 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -57,6 +57,7 @@ type NodeController struct { cloud cloudprovider.Interface clusterCIDR *net.IPNet deletingPodsRateLimiter util.RateLimiter + knownNodeSet util.StringSet kubeClient client.Interface // Method for easy mocking in unittest. lookupIP func(host string) ([]net.IP, error) @@ -118,6 +119,7 @@ func NewNodeController( } return &NodeController{ cloud: cloud, + knownNodeSet: make(util.StringSet), kubeClient: kubeClient, recorder: recorder, podEvictionTimeout: podEvictionTimeout, @@ -147,6 +149,12 @@ func (nc *NodeController) Run(period time.Duration) { }, nodeEvictionPeriod) } +// We observed a Node deletion in etcd. Currently we only need to remove Pods that +// were assigned to it. +func (nc *NodeController) deleteNode(nodeID string) error { + return nc.deletePods(nodeID) +} + // deletePods will delete all pods from master running on given node. func (nc *NodeController) deletePods(nodeID string) error { glog.V(2).Infof("Delete all pods from %v", nodeID) @@ -203,6 +211,29 @@ func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api // not reachable for a long period of time. func (nc *NodeController) monitorNodeStatus() error { nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + for _, node := range nodes.Items { + if !nc.knownNodeSet.Has(node.Name) { + glog.V(1).Infof("NodeController observed a new Node: %#v", node) + nc.recordNodeEvent(node.Name, fmt.Sprintf("Registered Node %v in NodeController", node.Name)) + nc.knownNodeSet.Insert(node.Name) + } + } + // If there's a difference between lengths of known Nodes and observed nodes + // we must have removed some Node. + if len(nc.knownNodeSet) != len(nodes.Items) { + observedSet := make(util.StringSet) + for _, node := range nodes.Items { + observedSet.Insert(node.Name) + } + deleted := nc.knownNodeSet.Difference(observedSet) + for node := range deleted { + glog.V(1).Infof("NodeController observed a Node deletion: %v", node) + nc.recordNodeEvent(node, fmt.Sprintf("Removing Node %v from NodeController", node)) + nc.deleteNode(node) + nc.knownNodeSet.Delete(node) + } + } + if err != nil { return err } diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 4648a3529e2..d10b96b49a0 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -554,6 +554,83 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } } +func TestNodeDeletion(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + fakeNodeHandler := &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + // Node status has just been updated. + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + Spec: api.NodeSpec{ + ExternalID: "node0", + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "node1", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionTrue, + // Node status has just been updated. + LastHeartbeatTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + Capacity: api.ResourceList{ + api.ResourceName(api.ResourceCPU): resource.MustParse("10"), + api.ResourceName(api.ResourceMemory): resource.MustParse("10G"), + }, + }, + Spec: api.NodeSpec{ + ExternalID: "node0", + }, + }, + }, + Fake: testclient.NewSimpleFake(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}), + } + + nodeController := NewNodeController(nil, fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()), + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) + nodeController.now = func() util.Time { return fakeNow } + if err := nodeController.monitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + fakeNodeHandler.Delete("node1") + if err := nodeController.monitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + podEvicted := false + for _, action := range fakeNodeHandler.Actions() { + if action.GetVerb() == "delete" && action.GetResource() == "pods" { + podEvicted = true + } + } + if !podEvicted { + t.Error("expected pods to be evicted from the deleted node") + } +} + func newNode(name string) *api.Node { return &api.Node{ ObjectMeta: api.ObjectMeta{Name: name},