From 0d5f8dfde16ad100b97048b45e654762b3be2c19 Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Wed, 11 Mar 2015 23:00:52 -0400 Subject: [PATCH] Node controller monitor node status --- .../app/controllermanager.go | 2 +- .../controller/nodecontroller.go | 273 +++++++----- .../controller/nodecontroller_test.go | 407 ++++++++++-------- 3 files changed, 401 insertions(+), 281 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 53371a31de4..2c823b54b5d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -107,7 +107,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.") - fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.") + fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controller send probes to kubelets and update NodeStatus.") // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 696a1f22083..2c82aef90e6 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -34,6 +34,20 @@ import ( "github.com/golang/glog" ) +const ( + // The constant is used if sync_nodes_status=False. NodeController will not proactively + // sync node status in this case, but will monitor node status updated from kubelet. If + // it doesn't receive update for this amount of time, it will start posting node NotReady + // condition. The amount of time when NodeController start evicting pods is controlled + // via flag 'pod_eviction_timeout'. + nodeMonitorGracePeriod = 8 * time.Second + // The constant is used if sync_nodes_status=False. It controls NodeController monitoring + // period, i.e. how often does NodeController check node status posted from kubelet. + // Theoretically, this value should be lower than nodeMonitorGracePeriod. + // TODO: Change node status monitor to watch based. + nodeMonitorPeriod = 5 * time.Second +) + var ( ErrRegistration = errors.New("unable to register all nodes.") ErrQueryIPAddress = errors.New("unable to query IP address.") @@ -53,8 +67,6 @@ type NodeController struct { } // NewNodeController returns a new node controller to sync instances from cloudprovider. -// TODO: NodeController health checker should be a separate package other than -// kubeletclient, node health check != kubelet health check. func NewNodeController( cloud cloudprovider.Interface, matchRE string, @@ -77,36 +89,34 @@ func NewNodeController( } } -// Run creates initial node list and start syncing instances from cloudprovider if any. +// Run creates initial node list and start syncing instances from cloudprovider, if any. // It also starts syncing cluster node status. // 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider // or from command line flag). To make cluster bootstrap faster, node controller populates // node addresses. -// 2. SyncCloud() is called periodically (if enabled) to sync instances from cloudprovider. +// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. // Node created here will only have specs. // 3. SyncNodeStatus() is called periodically (if enabled) to sync node status for nodes in -// k8s cluster. +// k8s cluster. If not enabled, MonitorNodeStatus() is called otherwise to monitor node +// status posted from kubelet. func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error if s.isRunningCloudProvider() { if syncNodeList { - nodes, err = s.GetCloudNodesWithSpec() - if err != nil { + if nodes, err = s.GetCloudNodesWithSpec(); err != nil { glog.Errorf("Error loading initial node from cloudprovider: %v", err) } } else { nodes = &api.NodeList{} } } else { - nodes, err = s.GetStaticNodesWithSpec() - if err != nil { + if nodes, err = s.GetStaticNodesWithSpec(); err != nil { glog.Errorf("Error loading initial static nodes: %v", err) } } - nodes, err = s.PopulateAddresses(nodes) - if err != nil { + if nodes, err = s.PopulateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { @@ -116,26 +126,25 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus // Start syncing node list from cloudprovider. if syncNodeList && s.isRunningCloudProvider() { go util.Forever(func() { - if err = s.SyncCloud(); err != nil { + if err = s.SyncCloudNodes(); err != nil { glog.Errorf("Error syncing cloud: %v", err) } }, period) } + // Start syncing or monitoring node status. if syncNodeStatus { - // Start syncing node status. go util.Forever(func() { if err = s.SyncNodeStatus(); err != nil { glog.Errorf("Error syncing status: %v", err) } }, period) } else { - // Start checking node reachability and evicting timeouted pods. go util.Forever(func() { - if err = s.EvictTimeoutedPods(); err != nil { - glog.Errorf("Error evicting timeouted pods: %v", err) + if err = s.MonitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) } - }, period) + }, nodeMonitorPeriod) } } @@ -173,8 +182,8 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr } } -// SyncCloud synchronizes the list of instances from cloudprovider to master server. -func (s *NodeController) SyncCloud() error { +// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. +func (s *NodeController) SyncCloudNodes() error { matches, err := s.GetCloudNodesWithSpec() if err != nil { return err @@ -220,8 +229,7 @@ func (s *NodeController) SyncNodeStatus() error { if err != nil { return err } - nodes = s.UpdateNodesStatus(nodes) - nodes, err = s.PopulateAddresses(nodes) + nodes, err = s.PopulateNodesStatus(nodes) if err != nil { return err } @@ -238,90 +246,25 @@ func (s *NodeController) SyncNodeStatus() error { return nil } -// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe -// and deletes pods from not reachable nodes. -func (s *NodeController) EvictTimeoutedPods() error { - nodes, err := s.kubeClient.Nodes().List() - if err != nil { - return err - } - for _, node := range nodes.Items { - if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) { - s.deletePods(node.Name) - } - } - return nil -} - -func latestReadyTime(node *api.Node) util.Time { - readyTime := node.ObjectMeta.CreationTimestamp - for _, condition := range node.Status.Conditions { - if condition.Type == api.NodeReady && - condition.Status == api.ConditionFull && - condition.LastProbeTime.After(readyTime.Time) { - readyTime = condition.LastProbeTime - } - } - return readyTime -} - -// PopulateAddresses queries Address for given list of nodes. -func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { - if s.isRunningCloudProvider() { - instances, ok := s.cloud.Instances() - if !ok { - return nodes, ErrCloudInstance - } - for i := range nodes.Items { - node := &nodes.Items[i] - nodeAddresses, err := instances.NodeAddresses(node.Name) - if err != nil { - glog.Errorf("error getting instance addresses for %s: %v", node.Name, err) - } else { - node.Status.Addresses = nodeAddresses - } - } - } else { - for i := range nodes.Items { - node := &nodes.Items[i] - addr := net.ParseIP(node.Name) - if addr != nil { - address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} - node.Status.Addresses = []api.NodeAddress{address} - } else { - addrs, err := s.lookupIP(node.Name) - if err != nil { - glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - glog.Errorf("No ip address for node %v", node.Name) - } else { - address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()} - node.Status.Addresses = []api.NodeAddress{address} - } - } - } - } - return nodes, nil -} - -// UpdateNodesStatus performs various condition checks for given list of nodes. -func (s *NodeController) UpdateNodesStatus(nodes *api.NodeList) *api.NodeList { +// PopulateNodesStatus populates node status for given list of nodes. +func (s *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) { var wg sync.WaitGroup wg.Add(len(nodes.Items)) for i := range nodes.Items { go func(node *api.Node) { node.Status.Conditions = s.DoCheck(node) - if err := s.updateNodeInfo(node); err != nil { + if err := s.populateNodeInfo(node); err != nil { glog.Errorf("Can't collect information for node %s: %v", node.Name, err) } wg.Done() }(&nodes.Items[i]) } wg.Wait() - return nodes + return s.PopulateAddresses(nodes) } -func (s *NodeController) updateNodeInfo(node *api.Node) error { +// populateNodeInfo gets node info from kubelet and update the node. +func (s *NodeController) populateNodeInfo(node *api.Node) error { nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name) if err != nil { return err @@ -341,7 +284,6 @@ func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { oldReadyCondition := s.getCondition(node, api.NodeReady) newReadyCondition := s.checkNodeReady(node) s.updateLastTransitionTime(oldReadyCondition, newReadyCondition) - if newReadyCondition.Status != api.ConditionFull { // Node is not ready for this probe, we need to check if pods need to be deleted. if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { @@ -421,30 +363,95 @@ func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { } } -// deletePods will delete all pods from master running on given node. -func (s *NodeController) deletePods(nodeID string) error { - glog.V(2).Infof("Delete all pods from %v", nodeID) - // TODO: We don't yet have field selectors from client, see issue #1362. - pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) +// PopulateAddresses queries Address for given list of nodes. +func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { + if s.isRunningCloudProvider() { + instances, ok := s.cloud.Instances() + if !ok { + return nodes, ErrCloudInstance + } + for i := range nodes.Items { + node := &nodes.Items[i] + nodeAddresses, err := instances.NodeAddresses(node.Name) + if err != nil { + glog.Errorf("error getting instance addresses for %s: %v", node.Name, err) + } else { + node.Status.Addresses = nodeAddresses + } + } + } else { + for i := range nodes.Items { + node := &nodes.Items[i] + addr := net.ParseIP(node.Name) + if addr != nil { + address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} + node.Status.Addresses = []api.NodeAddress{address} + } else { + addrs, err := s.lookupIP(node.Name) + if err != nil { + glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + glog.Errorf("No ip address for node %v", node.Name) + } else { + address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()} + node.Status.Addresses = []api.NodeAddress{address} + } + } + } + } + return nodes, nil +} + +// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if +// not, post node NotReady status. It also evicts all pods if node is not ready for +// a long period of time. +func (s *NodeController) MonitorNodeStatus() error { + nodes, err := s.kubeClient.Nodes().List() if err != nil { return err } - for _, pod := range pods.Items { - if pod.Status.Host != nodeID { - continue + for i := range nodes.Items { + node := &nodes.Items[i] + // Precompute condition times to avoid deep copy of node status (We'll modify node for updating, + // and NodeStatus.Conditions is an array, which makes assignment copy not useful). + latestConditionTime := s.latestConditionTime(node, api.NodeReady) + latestFullConditionTime := s.latestConditionTimeWithStatus(node, api.NodeReady, api.ConditionFull) + // Grace period has passed, post node NotReady condition to master, without contacting kubelet. + if util.Now().After(latestConditionTime.Add(nodeMonitorGracePeriod)) { + readyCondition := s.getCondition(node, api.NodeReady) + if readyCondition == nil { + node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Kubelet never posted node status"), + LastProbeTime: util.Now(), + LastTransitionTime: util.Now(), + }) + } else { + readyCondition.Status = api.ConditionNone + readyCondition.Reason = fmt.Sprintf("Kubelet stop posting node status") + readyCondition.LastProbeTime = util.Now() + if readyCondition.Status == api.ConditionFull { + readyCondition.LastTransitionTime = util.Now() + } + } + glog.V(2).Infof("updating node %v, whose status hasn't been updated by kubelet for a long time", node.Name) + _, err = s.kubeClient.Nodes().Update(node) + if err != nil { + glog.Errorf("error updating node %s: %v", node.Name, err) + } } - glog.V(2).Infof("Delete pod %v", pod.Name) - if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { - glog.Errorf("Error deleting pod %v: %v", pod.Name, err) + // Eviction timeout! Evict all pods on the unhealthy node. + if util.Now().After(latestFullConditionTime.Add(s.podEvictionTimeout)) { + s.deletePods(node.Name) } } - return nil } // GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The -// method only constructs spec fields for nodes. +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} for _, nodeID := range s.nodes { @@ -458,8 +465,8 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { } // GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The -// method only constructs spec fields for nodes. +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} instances, ok := s.cloud.Instances() @@ -494,6 +501,27 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { return result, nil } +// deletePods will delete all pods from master running on given node. +func (s *NodeController) deletePods(nodeID string) error { + glog.V(2).Infof("Delete all pods from %v", nodeID) + // TODO: We don't yet have field selectors from client, see issue #1362. + pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) + if err != nil { + return err + } + for _, pod := range pods.Items { + if pod.Status.Host != nodeID { + continue + } + glog.V(2).Infof("Delete pod %v", pod.Name) + if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { + glog.Errorf("Error deleting pod %v: %v", pod.Name, err) + } + } + + return nil +} + // isRunningCloudProvider checks if cluster is running with cloud provider. func (s *NodeController) isRunningCloudProvider() bool { return s.cloud != nil && len(s.matchRE) > 0 @@ -517,3 +545,30 @@ func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeCond } return nil } + +// latestConditionTime returns the latest condition timestamp for the node, regardless of condition status. +// If nothing matches, the node creation timestamp will be returned. +func (s *NodeController) latestConditionTime(node *api.Node, conditionType api.NodeConditionType) util.Time { + readyTime := node.ObjectMeta.CreationTimestamp + for _, condition := range node.Status.Conditions { + if condition.Type == conditionType && + condition.LastProbeTime.After(readyTime.Time) { + readyTime = condition.LastProbeTime + } + } + return readyTime +} + +// latestConditionTimeWithStatus returns the latest condition timestamp for the node, with given condition status. +// If nothing matches, the node creation timestamp will be returned. +func (s *NodeController) latestConditionTimeWithStatus(node *api.Node, conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) util.Time { + readyTime := node.ObjectMeta.CreationTimestamp + for _, condition := range node.Status.Conditions { + if condition.Type == conditionType && + condition.Status == conditionStatus && + condition.LastProbeTime.After(readyTime.Time) { + readyTime = condition.LastProbeTime + } + } + return readyTime +} diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 20bd2f47383..573eb6fd861 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -39,7 +39,7 @@ import ( ) // FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It -// allows test cases to have fine-grained control over mock behaviors. We alos need +// allows test cases to have fine-grained control over mock behaviors. We also need // PodsInterface and PodInterface to test list & delet pods, which is implemented in // the embeded client.Fake field. type FakeNodeHandler struct { @@ -377,7 +377,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { } } -func TestSyncCloud(t *testing.T) { +func TestSyncCloudNodes(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeCloud *fake_cloud.FakeCloud @@ -464,7 +464,7 @@ func TestSyncCloud(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) - if err := nodeController.SyncCloud(); err != nil { + if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -485,7 +485,7 @@ func TestSyncCloud(t *testing.T) { } } -func TestSyncCloudDeletePods(t *testing.T) { +func TestSyncCloudNodesEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeCloud *fake_cloud.FakeCloud @@ -546,7 +546,7 @@ func TestSyncCloudDeletePods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) - if err := nodeController.SyncCloud(); err != nil { + if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -688,6 +688,106 @@ func TestPopulateNodeAddresses(t *testing.T) { } } +func TestSyncNodeStatus(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + fakeCloud *fake_cloud.FakeCloud + expectedNodes []*api.Node + expectedRequestCount int + }{ + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Success, + Err: nil, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + }, + { + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + }, + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + }, + { + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + }, + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + }, + }, + expectedRequestCount: 3, // List + 2xUpdate + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + for i := range item.fakeNodeHandler.UpdatedNodes { + conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions + for j := range conditions { + if conditions[j].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + if conditions[j].LastProbeTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + conditions[j].LastTransitionTime = util.Time{} + conditions[j].LastProbeTime = util.Time{} + } + } + if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { + t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) + } + // Second sync will also update the node. + item.fakeNodeHandler.RequestCount = 0 + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + } +} + func TestSyncNodeStatusTransitionTime(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler @@ -794,119 +894,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { } } -func TestEvictTimeoutedPods(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - expectedRequestCount int - expectedActions []client.FakeAction - }{ - // Node created long time ago, with no status. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - // Node created recently, with no status. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Now(), - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: nil, - }, - // Node created long time ago, with status updated long time ago. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - // Node created long time ago, with status updated recently. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Now(), - }, - }, - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: nil, - }, - } - - for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) - if err := nodeController.EvictTimeoutedPods(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { - t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) - } - } -} - -func TestSyncNodeStatusDeletePods(t *testing.T) { +func TestSyncNodeStatusEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -1056,77 +1044,153 @@ func TestSyncNodeStatusDeletePods(t *testing.T) { } } -func TestSyncNodeStatus(t *testing.T) { +func TestMonitorNodeStatus(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler - fakeKubeletClient *FakeKubeletClient - fakeCloud *fake_cloud.FakeCloud - expectedNodes []*api.Node expectedRequestCount int + expectedEvictPods bool + expectedNodes []*api.Node }{ + // Node created long time ago, with no status. { fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - }, - fakeKubeletClient: &FakeKubeletClient{ - Status: probe.Success, - Err: nil, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, }, + expectedRequestCount: 2, // List+Update + expectedEvictPods: true, expectedNodes: []*api.Node{ { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", - }, - { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - }, - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, - }, + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, - }, - { - ObjectMeta: api.ObjectMeta{Name: "node1"}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Kubelet never posted node status"), + LastProbeTime: util.Time{}, + LastTransitionTime: util.Time{}, }, - { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - }, - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, }, }, }, }, - expectedRequestCount: 3, // List + 2xUpdate + }, + // Node created recently, with no status. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Now(), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedEvictPods: false, + expectedNodes: nil, + }, + // Node created long time ago, with status updated long time ago. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 2, // List+Update + expectedEvictPods: true, + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Kubelet stop posting node status"), + LastProbeTime: util.Time{}, + LastTransitionTime: util.Time{}, + }, + }, + }, + }, + }, + }, + // Node created long time ago, with status updated recently. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + LastProbeTime: util.Now(), + LastTransitionTime: util.Time{}, + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedEvictPods: false, + expectedNodes: nil, }, } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) - if err := nodeController.SyncNodeStatus(); err != nil { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } for i := range item.fakeNodeHandler.UpdatedNodes { @@ -1145,13 +1209,14 @@ func TestSyncNodeStatus(t *testing.T) { if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } - // Second sync will also update the node. - item.fakeNodeHandler.RequestCount = 0 - if err := nodeController.SyncNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) + podEvicted := false + for _, action := range item.fakeNodeHandler.Actions { + if action.Action == "delete-pod" { + podEvicted = true + } } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + if item.expectedEvictPods != podEvicted { + t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted) } } }