diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index e9099766e4b..23649a6d7a1 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -119,7 +119,8 @@ type NodeController struct { daemonSetController *framework.Controller daemonSetStore cache.StoreToDaemonSetLister - forcefullyDeletePod func(*api.Pod) error + forcefullyDeletePod func(*api.Pod) error + nodeExistsInCloudProvider func(string) (bool, error) } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -149,24 +150,25 @@ func NewNodeController( evictorLock := sync.Mutex{} nc := &NodeController{ - cloud: cloud, - knownNodeSet: make(sets.String), - kubeClient: kubeClient, - recorder: recorder, - podEvictionTimeout: podEvictionTimeout, - maximumGracePeriod: 5 * time.Minute, - evictorLock: &evictorLock, - podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter), - terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter), - nodeStatusMap: make(map[string]nodeStatusData), - nodeMonitorGracePeriod: nodeMonitorGracePeriod, - nodeMonitorPeriod: nodeMonitorPeriod, - nodeStartupGracePeriod: nodeStartupGracePeriod, - lookupIP: net.LookupIP, - now: unversioned.Now, - clusterCIDR: clusterCIDR, - allocateNodeCIDRs: allocateNodeCIDRs, - forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, + cloud: cloud, + knownNodeSet: make(sets.String), + kubeClient: kubeClient, + recorder: recorder, + podEvictionTimeout: podEvictionTimeout, + maximumGracePeriod: 5 * time.Minute, + evictorLock: &evictorLock, + podEvictor: NewRateLimitedTimedQueue(deletionEvictionLimiter), + terminationEvictor: NewRateLimitedTimedQueue(terminationEvictionLimiter), + nodeStatusMap: make(map[string]nodeStatusData), + nodeMonitorGracePeriod: nodeMonitorGracePeriod, + nodeMonitorPeriod: nodeMonitorPeriod, + nodeStartupGracePeriod: nodeStartupGracePeriod, + lookupIP: net.LookupIP, + now: unversioned.Now, + clusterCIDR: clusterCIDR, + allocateNodeCIDRs: allocateNodeCIDRs, + forcefullyDeletePod: func(p *api.Pod) error { return forcefullyDeletePod(kubeClient, p) }, + nodeExistsInCloudProvider: func(nodeName string) (bool, error) { return nodeExistsInCloudProvider(cloud, nodeName) }, } nc.podStore.Store, nc.podController = framework.NewInformer( @@ -462,33 +464,26 @@ func (nc *NodeController) monitorNodeStatus() error { } // Check with the cloud provider to see if the node still exists. If it - // doesn't, delete the node and all pods scheduled on the node. + // doesn't, delete the node immediately. if readyCondition.Status != api.ConditionTrue && nc.cloud != nil { - instances, ok := nc.cloud.Instances() - if !ok { - glog.Errorf("%v", ErrCloudInstance) + exists, err := nc.nodeExistsInCloudProvider(node.Name) + if err != nil { + glog.Errorf("Error determining if node %v exists in cloud: %v", node.Name, err) continue } - if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound { + if !exists { glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name) nc.recordNodeEvent(node.Name, api.EventTypeNormal, "DeletingNode", fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) - - remaining, err := nc.hasPods(node.Name) - if err != nil { - glog.Errorf("Unable to determine whether node %s has pods, will retry: %v", node.Name, err) - continue - } - if remaining { - // queue eviction of the pods on the node - glog.V(2).Infof("Deleting node %s is delayed while pods are evicted", node.Name) - nc.evictPods(node.Name) - continue - } - - if err := nc.kubeClient.Core().Nodes().Delete(node.Name, nil); err != nil { - glog.Errorf("Unable to delete node %s: %v", node.Name, err) - continue - } + go func(nodeName string) { + defer utilruntime.HandleCrash() + // Kubelet is not reporting and Cloud Provider says node + // is gone. Delete it without worrying about grace + // periods. + if err := nc.forcefullyDeleteNode(nodeName); err != nil { + glog.Errorf("Unable to forcefully delete node %q: %v", nodeName, err) + } + }(node.Name) + continue } } } @@ -496,6 +491,43 @@ func (nc *NodeController) monitorNodeStatus() error { return nil } +func nodeExistsInCloudProvider(cloud cloudprovider.Interface, nodeName string) (bool, error) { + instances, ok := cloud.Instances() + if !ok { + return false, fmt.Errorf("%v", ErrCloudInstance) + } + if _, err := instances.ExternalID(nodeName); err != nil { + if err == cloudprovider.InstanceNotFound { + return false, nil + } + return false, err + } + return true, nil +} + +// forcefullyDeleteNode immediately deletes all pods on the node, and then +// deletes the node itself. +func (nc *NodeController) forcefullyDeleteNode(nodeName string) error { + selector := fields.OneTermEqualSelector(api.PodHostField, nodeName) + options := api.ListOptions{FieldSelector: selector} + pods, err := nc.kubeClient.Core().Pods(api.NamespaceAll).List(options) + if err != nil { + return fmt.Errorf("unable to list pods on node %q: %v", nodeName, err) + } + for _, pod := range pods.Items { + if pod.Spec.NodeName != nodeName { + continue + } + if err := nc.forcefullyDeletePod(&pod); err != nil { + return fmt.Errorf("unable to delete pod %q on node %q: %v", pod.Name, nodeName, err) + } + } + if err := nc.kubeClient.Core().Nodes().Delete(nodeName, nil); err != nil { + return fmt.Errorf("unable to delete node %q: %v", nodeName, err) + } + return nil +} + // reconcileNodeCIDRs looks at each node and assigns it a valid CIDR // if it doesn't currently have one. func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) { diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 8961070cc95..72f7fd94273 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -30,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" + fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/watch" ) @@ -59,7 +61,8 @@ type FakeNodeHandler struct { RequestCount int // Synchronization - createLock sync.Mutex + createLock sync.Mutex + deleteWaitChan chan struct{} } type FakeLegacyHandler struct { @@ -125,6 +128,11 @@ func (m *FakeNodeHandler) List(opts api.ListOptions) (*api.NodeList, error) { } func (m *FakeNodeHandler) Delete(id string, opt *api.DeleteOptions) error { + defer func() { + if m.deleteWaitChan != nil { + m.deleteWaitChan <- struct{}{} + } + }() m.DeletedNodes = append(m.DeletedNodes, newNode(id)) m.RequestCount++ return nil @@ -451,6 +459,58 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } } +// TestCloudProviderNoRateLimit tests that monitorNodes() immediately deletes +// pods and the node when kubelet has not reported, and the cloudprovider says +// the node is gone. +func TestCloudProviderNoRateLimit(t *testing.T) { + fnh := &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: unversioned.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastHeartbeatTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + LastTransitionTime: unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Clientset: fake.NewSimpleClientset(&api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node0")}}), + deleteWaitChan: make(chan struct{}), + } + nodeController := NewNodeController(nil, fnh, 10*time.Minute, + util.NewFakeAlwaysRateLimiter(), util.NewFakeAlwaysRateLimiter(), + testNodeMonitorGracePeriod, testNodeStartupGracePeriod, + testNodeMonitorPeriod, nil, false) + nodeController.cloud = &fakecloud.FakeCloud{} + nodeController.now = func() unversioned.Time { return unversioned.Date(2016, 1, 1, 12, 0, 0, 0, time.UTC) } + nodeController.nodeExistsInCloudProvider = func(nodeName string) (bool, error) { + return false, nil + } + // monitorNodeStatus should allow this node to be immediately deleted + if err := nodeController.monitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + select { + case <-fnh.deleteWaitChan: + case <-time.After(wait.ForeverTestTimeout): + t.Errorf("Timed out waiting %v for node to be deleted", wait.ForeverTestTimeout) + } + if len(fnh.DeletedNodes) != 1 || fnh.DeletedNodes[0].Name != "node0" { + t.Errorf("Node was not deleted") + } + if nodeOnQueue := nodeController.podEvictor.Remove("node0"); nodeOnQueue { + t.Errorf("Node was queued for eviction. Should have been immediately deleted.") + } +} + func TestMonitorNodeStatusUpdateStatus(t *testing.T) { fakeNow := unversioned.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct {