From 0d5f8dfde16ad100b97048b45e654762b3be2c19 Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Wed, 11 Mar 2015 23:00:52 -0400 Subject: [PATCH 1/4] Node controller monitor node status --- .../app/controllermanager.go | 2 +- .../controller/nodecontroller.go | 273 +++++++----- .../controller/nodecontroller_test.go | 407 ++++++++++-------- 3 files changed, 401 insertions(+), 281 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 53371a31de4..2c823b54b5d 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -107,7 +107,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.") - fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.") + fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controller send probes to kubelets and update NodeStatus.") // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 696a1f22083..2c82aef90e6 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -34,6 +34,20 @@ import ( "github.com/golang/glog" ) +const ( + // The constant is used if sync_nodes_status=False. NodeController will not proactively + // sync node status in this case, but will monitor node status updated from kubelet. If + // it doesn't receive update for this amount of time, it will start posting node NotReady + // condition. The amount of time when NodeController start evicting pods is controlled + // via flag 'pod_eviction_timeout'. + nodeMonitorGracePeriod = 8 * time.Second + // The constant is used if sync_nodes_status=False. It controls NodeController monitoring + // period, i.e. how often does NodeController check node status posted from kubelet. + // Theoretically, this value should be lower than nodeMonitorGracePeriod. + // TODO: Change node status monitor to watch based. + nodeMonitorPeriod = 5 * time.Second +) + var ( ErrRegistration = errors.New("unable to register all nodes.") ErrQueryIPAddress = errors.New("unable to query IP address.") @@ -53,8 +67,6 @@ type NodeController struct { } // NewNodeController returns a new node controller to sync instances from cloudprovider. -// TODO: NodeController health checker should be a separate package other than -// kubeletclient, node health check != kubelet health check. func NewNodeController( cloud cloudprovider.Interface, matchRE string, @@ -77,36 +89,34 @@ func NewNodeController( } } -// Run creates initial node list and start syncing instances from cloudprovider if any. +// Run creates initial node list and start syncing instances from cloudprovider, if any. // It also starts syncing cluster node status. // 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider // or from command line flag). To make cluster bootstrap faster, node controller populates // node addresses. -// 2. SyncCloud() is called periodically (if enabled) to sync instances from cloudprovider. +// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. // Node created here will only have specs. // 3. SyncNodeStatus() is called periodically (if enabled) to sync node status for nodes in -// k8s cluster. +// k8s cluster. If not enabled, MonitorNodeStatus() is called otherwise to monitor node +// status posted from kubelet. func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error if s.isRunningCloudProvider() { if syncNodeList { - nodes, err = s.GetCloudNodesWithSpec() - if err != nil { + if nodes, err = s.GetCloudNodesWithSpec(); err != nil { glog.Errorf("Error loading initial node from cloudprovider: %v", err) } } else { nodes = &api.NodeList{} } } else { - nodes, err = s.GetStaticNodesWithSpec() - if err != nil { + if nodes, err = s.GetStaticNodesWithSpec(); err != nil { glog.Errorf("Error loading initial static nodes: %v", err) } } - nodes, err = s.PopulateAddresses(nodes) - if err != nil { + if nodes, err = s.PopulateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { @@ -116,26 +126,25 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus // Start syncing node list from cloudprovider. if syncNodeList && s.isRunningCloudProvider() { go util.Forever(func() { - if err = s.SyncCloud(); err != nil { + if err = s.SyncCloudNodes(); err != nil { glog.Errorf("Error syncing cloud: %v", err) } }, period) } + // Start syncing or monitoring node status. if syncNodeStatus { - // Start syncing node status. go util.Forever(func() { if err = s.SyncNodeStatus(); err != nil { glog.Errorf("Error syncing status: %v", err) } }, period) } else { - // Start checking node reachability and evicting timeouted pods. go util.Forever(func() { - if err = s.EvictTimeoutedPods(); err != nil { - glog.Errorf("Error evicting timeouted pods: %v", err) + if err = s.MonitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) } - }, period) + }, nodeMonitorPeriod) } } @@ -173,8 +182,8 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr } } -// SyncCloud synchronizes the list of instances from cloudprovider to master server. -func (s *NodeController) SyncCloud() error { +// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. +func (s *NodeController) SyncCloudNodes() error { matches, err := s.GetCloudNodesWithSpec() if err != nil { return err @@ -220,8 +229,7 @@ func (s *NodeController) SyncNodeStatus() error { if err != nil { return err } - nodes = s.UpdateNodesStatus(nodes) - nodes, err = s.PopulateAddresses(nodes) + nodes, err = s.PopulateNodesStatus(nodes) if err != nil { return err } @@ -238,90 +246,25 @@ func (s *NodeController) SyncNodeStatus() error { return nil } -// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe -// and deletes pods from not reachable nodes. -func (s *NodeController) EvictTimeoutedPods() error { - nodes, err := s.kubeClient.Nodes().List() - if err != nil { - return err - } - for _, node := range nodes.Items { - if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) { - s.deletePods(node.Name) - } - } - return nil -} - -func latestReadyTime(node *api.Node) util.Time { - readyTime := node.ObjectMeta.CreationTimestamp - for _, condition := range node.Status.Conditions { - if condition.Type == api.NodeReady && - condition.Status == api.ConditionFull && - condition.LastProbeTime.After(readyTime.Time) { - readyTime = condition.LastProbeTime - } - } - return readyTime -} - -// PopulateAddresses queries Address for given list of nodes. -func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { - if s.isRunningCloudProvider() { - instances, ok := s.cloud.Instances() - if !ok { - return nodes, ErrCloudInstance - } - for i := range nodes.Items { - node := &nodes.Items[i] - nodeAddresses, err := instances.NodeAddresses(node.Name) - if err != nil { - glog.Errorf("error getting instance addresses for %s: %v", node.Name, err) - } else { - node.Status.Addresses = nodeAddresses - } - } - } else { - for i := range nodes.Items { - node := &nodes.Items[i] - addr := net.ParseIP(node.Name) - if addr != nil { - address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} - node.Status.Addresses = []api.NodeAddress{address} - } else { - addrs, err := s.lookupIP(node.Name) - if err != nil { - glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) - } else if len(addrs) == 0 { - glog.Errorf("No ip address for node %v", node.Name) - } else { - address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()} - node.Status.Addresses = []api.NodeAddress{address} - } - } - } - } - return nodes, nil -} - -// UpdateNodesStatus performs various condition checks for given list of nodes. -func (s *NodeController) UpdateNodesStatus(nodes *api.NodeList) *api.NodeList { +// PopulateNodesStatus populates node status for given list of nodes. +func (s *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) { var wg sync.WaitGroup wg.Add(len(nodes.Items)) for i := range nodes.Items { go func(node *api.Node) { node.Status.Conditions = s.DoCheck(node) - if err := s.updateNodeInfo(node); err != nil { + if err := s.populateNodeInfo(node); err != nil { glog.Errorf("Can't collect information for node %s: %v", node.Name, err) } wg.Done() }(&nodes.Items[i]) } wg.Wait() - return nodes + return s.PopulateAddresses(nodes) } -func (s *NodeController) updateNodeInfo(node *api.Node) error { +// populateNodeInfo gets node info from kubelet and update the node. +func (s *NodeController) populateNodeInfo(node *api.Node) error { nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name) if err != nil { return err @@ -341,7 +284,6 @@ func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { oldReadyCondition := s.getCondition(node, api.NodeReady) newReadyCondition := s.checkNodeReady(node) s.updateLastTransitionTime(oldReadyCondition, newReadyCondition) - if newReadyCondition.Status != api.ConditionFull { // Node is not ready for this probe, we need to check if pods need to be deleted. if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { @@ -421,30 +363,95 @@ func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { } } -// deletePods will delete all pods from master running on given node. -func (s *NodeController) deletePods(nodeID string) error { - glog.V(2).Infof("Delete all pods from %v", nodeID) - // TODO: We don't yet have field selectors from client, see issue #1362. - pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) +// PopulateAddresses queries Address for given list of nodes. +func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { + if s.isRunningCloudProvider() { + instances, ok := s.cloud.Instances() + if !ok { + return nodes, ErrCloudInstance + } + for i := range nodes.Items { + node := &nodes.Items[i] + nodeAddresses, err := instances.NodeAddresses(node.Name) + if err != nil { + glog.Errorf("error getting instance addresses for %s: %v", node.Name, err) + } else { + node.Status.Addresses = nodeAddresses + } + } + } else { + for i := range nodes.Items { + node := &nodes.Items[i] + addr := net.ParseIP(node.Name) + if addr != nil { + address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} + node.Status.Addresses = []api.NodeAddress{address} + } else { + addrs, err := s.lookupIP(node.Name) + if err != nil { + glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) + } else if len(addrs) == 0 { + glog.Errorf("No ip address for node %v", node.Name) + } else { + address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()} + node.Status.Addresses = []api.NodeAddress{address} + } + } + } + } + return nodes, nil +} + +// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if +// not, post node NotReady status. It also evicts all pods if node is not ready for +// a long period of time. +func (s *NodeController) MonitorNodeStatus() error { + nodes, err := s.kubeClient.Nodes().List() if err != nil { return err } - for _, pod := range pods.Items { - if pod.Status.Host != nodeID { - continue + for i := range nodes.Items { + node := &nodes.Items[i] + // Precompute condition times to avoid deep copy of node status (We'll modify node for updating, + // and NodeStatus.Conditions is an array, which makes assignment copy not useful). + latestConditionTime := s.latestConditionTime(node, api.NodeReady) + latestFullConditionTime := s.latestConditionTimeWithStatus(node, api.NodeReady, api.ConditionFull) + // Grace period has passed, post node NotReady condition to master, without contacting kubelet. + if util.Now().After(latestConditionTime.Add(nodeMonitorGracePeriod)) { + readyCondition := s.getCondition(node, api.NodeReady) + if readyCondition == nil { + node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Kubelet never posted node status"), + LastProbeTime: util.Now(), + LastTransitionTime: util.Now(), + }) + } else { + readyCondition.Status = api.ConditionNone + readyCondition.Reason = fmt.Sprintf("Kubelet stop posting node status") + readyCondition.LastProbeTime = util.Now() + if readyCondition.Status == api.ConditionFull { + readyCondition.LastTransitionTime = util.Now() + } + } + glog.V(2).Infof("updating node %v, whose status hasn't been updated by kubelet for a long time", node.Name) + _, err = s.kubeClient.Nodes().Update(node) + if err != nil { + glog.Errorf("error updating node %s: %v", node.Name, err) + } } - glog.V(2).Infof("Delete pod %v", pod.Name) - if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { - glog.Errorf("Error deleting pod %v: %v", pod.Name, err) + // Eviction timeout! Evict all pods on the unhealthy node. + if util.Now().After(latestFullConditionTime.Add(s.podEvictionTimeout)) { + s.deletePods(node.Name) } } - return nil } // GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The -// method only constructs spec fields for nodes. +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} for _, nodeID := range s.nodes { @@ -458,8 +465,8 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { } // GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The -// method only constructs spec fields for nodes. +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} instances, ok := s.cloud.Instances() @@ -494,6 +501,27 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { return result, nil } +// deletePods will delete all pods from master running on given node. +func (s *NodeController) deletePods(nodeID string) error { + glog.V(2).Infof("Delete all pods from %v", nodeID) + // TODO: We don't yet have field selectors from client, see issue #1362. + pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) + if err != nil { + return err + } + for _, pod := range pods.Items { + if pod.Status.Host != nodeID { + continue + } + glog.V(2).Infof("Delete pod %v", pod.Name) + if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { + glog.Errorf("Error deleting pod %v: %v", pod.Name, err) + } + } + + return nil +} + // isRunningCloudProvider checks if cluster is running with cloud provider. func (s *NodeController) isRunningCloudProvider() bool { return s.cloud != nil && len(s.matchRE) > 0 @@ -517,3 +545,30 @@ func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeCond } return nil } + +// latestConditionTime returns the latest condition timestamp for the node, regardless of condition status. +// If nothing matches, the node creation timestamp will be returned. +func (s *NodeController) latestConditionTime(node *api.Node, conditionType api.NodeConditionType) util.Time { + readyTime := node.ObjectMeta.CreationTimestamp + for _, condition := range node.Status.Conditions { + if condition.Type == conditionType && + condition.LastProbeTime.After(readyTime.Time) { + readyTime = condition.LastProbeTime + } + } + return readyTime +} + +// latestConditionTimeWithStatus returns the latest condition timestamp for the node, with given condition status. +// If nothing matches, the node creation timestamp will be returned. +func (s *NodeController) latestConditionTimeWithStatus(node *api.Node, conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) util.Time { + readyTime := node.ObjectMeta.CreationTimestamp + for _, condition := range node.Status.Conditions { + if condition.Type == conditionType && + condition.Status == conditionStatus && + condition.LastProbeTime.After(readyTime.Time) { + readyTime = condition.LastProbeTime + } + } + return readyTime +} diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 20bd2f47383..573eb6fd861 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -39,7 +39,7 @@ import ( ) // FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It -// allows test cases to have fine-grained control over mock behaviors. We alos need +// allows test cases to have fine-grained control over mock behaviors. We also need // PodsInterface and PodInterface to test list & delet pods, which is implemented in // the embeded client.Fake field. type FakeNodeHandler struct { @@ -377,7 +377,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { } } -func TestSyncCloud(t *testing.T) { +func TestSyncCloudNodes(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeCloud *fake_cloud.FakeCloud @@ -464,7 +464,7 @@ func TestSyncCloud(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) - if err := nodeController.SyncCloud(); err != nil { + if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -485,7 +485,7 @@ func TestSyncCloud(t *testing.T) { } } -func TestSyncCloudDeletePods(t *testing.T) { +func TestSyncCloudNodesEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeCloud *fake_cloud.FakeCloud @@ -546,7 +546,7 @@ func TestSyncCloudDeletePods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) - if err := nodeController.SyncCloud(); err != nil { + if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -688,6 +688,106 @@ func TestPopulateNodeAddresses(t *testing.T) { } } +func TestSyncNodeStatus(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + fakeCloud *fake_cloud.FakeCloud + expectedNodes []*api.Node + expectedRequestCount int + }{ + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Success, + Err: nil, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + }, + { + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + }, + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + }, + { + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + }, + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + }, + }, + expectedRequestCount: 3, // List + 2xUpdate + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + for i := range item.fakeNodeHandler.UpdatedNodes { + conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions + for j := range conditions { + if conditions[j].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } + if conditions[j].LastProbeTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") + } + conditions[j].LastTransitionTime = util.Time{} + conditions[j].LastProbeTime = util.Time{} + } + } + if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { + t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) + } + // Second sync will also update the node. + item.fakeNodeHandler.RequestCount = 0 + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + } +} + func TestSyncNodeStatusTransitionTime(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler @@ -794,119 +894,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { } } -func TestEvictTimeoutedPods(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - expectedRequestCount int - expectedActions []client.FakeAction - }{ - // Node created long time ago, with no status. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - // Node created recently, with no status. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Now(), - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: nil, - }, - // Node created long time ago, with status updated long time ago. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - // Node created long time ago, with status updated recently. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Now(), - }, - }, - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: nil, - }, - } - - for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) - if err := nodeController.EvictTimeoutedPods(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { - t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) - } - } -} - -func TestSyncNodeStatusDeletePods(t *testing.T) { +func TestSyncNodeStatusEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -1056,77 +1044,153 @@ func TestSyncNodeStatusDeletePods(t *testing.T) { } } -func TestSyncNodeStatus(t *testing.T) { +func TestMonitorNodeStatus(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler - fakeKubeletClient *FakeKubeletClient - fakeCloud *fake_cloud.FakeCloud - expectedNodes []*api.Node expectedRequestCount int + expectedEvictPods bool + expectedNodes []*api.Node }{ + // Node created long time ago, with no status. { fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - }, - fakeKubeletClient: &FakeKubeletClient{ - Status: probe.Success, - Err: nil, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, }, + expectedRequestCount: 2, // List+Update + expectedEvictPods: true, expectedNodes: []*api.Node{ { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", - }, - { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - }, - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, - }, + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, - }, - { - ObjectMeta: api.ObjectMeta{Name: "node1"}, Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Kubelet never posted node status"), + LastProbeTime: util.Time{}, + LastTransitionTime: util.Time{}, }, - { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - }, - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, }, }, }, }, - expectedRequestCount: 3, // List + 2xUpdate + }, + // Node created recently, with no status. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Now(), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedEvictPods: false, + expectedNodes: nil, + }, + // Node created long time ago, with status updated long time ago. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 2, // List+Update + expectedEvictPods: true, + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Kubelet stop posting node status"), + LastProbeTime: util.Time{}, + LastTransitionTime: util.Time{}, + }, + }, + }, + }, + }, + }, + // Node created long time ago, with status updated recently. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + LastProbeTime: util.Now(), + LastTransitionTime: util.Time{}, + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedEvictPods: false, + expectedNodes: nil, }, } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) - if err := nodeController.SyncNodeStatus(); err != nil { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } for i := range item.fakeNodeHandler.UpdatedNodes { @@ -1145,13 +1209,14 @@ func TestSyncNodeStatus(t *testing.T) { if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } - // Second sync will also update the node. - item.fakeNodeHandler.RequestCount = 0 - if err := nodeController.SyncNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) + podEvicted := false + for _, action := range item.fakeNodeHandler.Actions { + if action.Action == "delete-pod" { + podEvicted = true + } } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + if item.expectedEvictPods != podEvicted { + t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted) } } } From cf548765c98c759b34e1cb207a117d120db624aa Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Fri, 13 Mar 2015 20:59:25 -0400 Subject: [PATCH 2/4] Change kubelet update frequency to 2s, and make it a constant. --- cmd/kubelet/app/server.go | 19 ++++++------------- .../controller/nodecontroller.go | 18 ++++++++++++++---- pkg/kubelet/kubelet.go | 14 +++++++------- 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 9bc62e25d5d..745d61b3d11 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -51,7 +51,6 @@ type KubeletServer struct { SyncFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration - StatusUpdateFrequency time.Duration ManifestURL string EnableServer bool Address util.IP @@ -85,13 +84,12 @@ type KubeletServer struct { // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { return &KubeletServer{ - SyncFrequency: 10 * time.Second, - FileCheckFrequency: 20 * time.Second, - HTTPCheckFrequency: 20 * time.Second, - StatusUpdateFrequency: 20 * time.Second, - EnableServer: true, - Address: util.IP(net.ParseIP("127.0.0.1")), - Port: ports.KubeletPort, + SyncFrequency: 10 * time.Second, + FileCheckFrequency: 20 * time.Second, + HTTPCheckFrequency: 20 * time.Second, + EnableServer: true, + Address: util.IP(net.ParseIP("127.0.0.1")), + Port: ports.KubeletPort, PodInfraContainerImage: kubelet.PodInfraContainerImage, RootDirectory: defaultRootDir, RegistryBurst: 10, @@ -112,7 +110,6 @@ func NewKubeletServer() *KubeletServer { func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files") fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config") - fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master") fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data") fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data") fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest") @@ -179,7 +176,6 @@ func (s *KubeletServer) Run(_ []string) error { RootDirectory: s.RootDirectory, ConfigFile: s.Config, ManifestURL: s.ManifestURL, - StatusUpdateFrequency: s.StatusUpdateFrequency, FileCheckFrequency: s.FileCheckFrequency, HTTPCheckFrequency: s.HTTPCheckFrequency, PodInfraContainerImage: s.PodInfraContainerImage, @@ -285,7 +281,6 @@ func SimpleKubelet(client *client.Client, EnableDebuggingHandlers: true, HTTPCheckFrequency: 1 * time.Second, FileCheckFrequency: 1 * time.Second, - StatusUpdateFrequency: 3 * time.Second, SyncFrequency: 3 * time.Second, MinimumGCAge: 10 * time.Second, MaxPerPodContainerCount: 5, @@ -380,7 +375,6 @@ type KubeletConfig struct { RootDirectory string ConfigFile string ManifestURL string - StatusUpdateFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string @@ -446,7 +440,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.StreamingConnectionIdleTimeout, kc.Recorder, kc.CadvisorInterface, - kc.StatusUpdateFrequency, kc.ImageGCPolicy) if err != nil { diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 2c82aef90e6..7a3d545cfac 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -39,8 +39,12 @@ const ( // sync node status in this case, but will monitor node status updated from kubelet. If // it doesn't receive update for this amount of time, it will start posting node NotReady // condition. The amount of time when NodeController start evicting pods is controlled - // via flag 'pod_eviction_timeout'. + // via flag 'pod_eviction_timeout'. Note: be cautious when changing nodeMonitorGracePeriod, + // it must work with kubelet.nodeStatusUpdateFrequency. nodeMonitorGracePeriod = 8 * time.Second + // The constant is used if sync_nodes_status=False, and for node startup. When node + // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. + nodeStartupGracePeriod = 30 * time.Second // The constant is used if sync_nodes_status=False. It controls NodeController monitoring // period, i.e. how often does NodeController check node status posted from kubelet. // Theoretically, this value should be lower than nodeMonitorGracePeriod. @@ -412,12 +416,18 @@ func (s *NodeController) MonitorNodeStatus() error { } for i := range nodes.Items { node := &nodes.Items[i] - // Precompute condition times to avoid deep copy of node status (We'll modify node for updating, - // and NodeStatus.Conditions is an array, which makes assignment copy not useful). + // Precompute all condition times to avoid deep copy of node status (We'll modify node for + // updating, and NodeStatus.Conditions is an array, which makes assignment copy not useful). latestConditionTime := s.latestConditionTime(node, api.NodeReady) + var gracePeriod time.Duration + if latestConditionTime == node.CreationTimestamp { + gracePeriod = nodeStartupGracePeriod + } else { + gracePeriod = nodeMonitorGracePeriod + } latestFullConditionTime := s.latestConditionTimeWithStatus(node, api.NodeReady, api.ConditionFull) // Grace period has passed, post node NotReady condition to master, without contacting kubelet. - if util.Now().After(latestConditionTime.Add(nodeMonitorGracePeriod)) { + if util.Now().After(latestConditionTime.Add(gracePeriod)) { readyCondition := s.getCondition(node, api.NodeReady) if readyCondition == nil { node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 328c85be6ba..75e27f27868 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -75,8 +75,10 @@ const ( initialNodeStatusUpdateFrequency = 100 * time.Millisecond nodeStatusUpdateFrequencyInc = 500 * time.Millisecond - // The retry count for updating node status at each sync period. - nodeStatusUpdateRetry = 5 + // Node status update frequency and retry count. Note: be cautious when changing nodeStatusUpdateFrequency, + // it must work with nodecontroller.nodeMonitorGracePeriod. + nodeStatusUpdateFrequency = 2 * time.Second + nodeStatusUpdateRetry = 5 ) var ( @@ -124,7 +126,6 @@ func NewMainKubelet( streamingConnectionIdleTimeout time.Duration, recorder record.EventRecorder, cadvisorInterface cadvisor.Interface, - statusUpdateFrequency time.Duration, imageGCPolicy ImageGCPolicy) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -202,7 +203,6 @@ func NewMainKubelet( dockerClient: dockerClient, kubeClient: kubeClient, rootDirectory: rootDirectory, - statusUpdateFrequency: statusUpdateFrequency, resyncInterval: resyncInterval, podInfraContainerImage: podInfraContainerImage, containerIDToRef: map[string]*api.ObjectReference{}, @@ -275,7 +275,6 @@ type Kubelet struct { rootDirectory string podInfraContainerImage string podWorkers *podWorkers - statusUpdateFrequency time.Duration resyncInterval time.Duration sourcesReady SourcesReadyFn @@ -532,7 +531,8 @@ func (kl *Kubelet) syncNodeStatus() { if kl.kubeClient == nil { return } - for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { + + for feq := initialNodeStatusUpdateFrequency; feq < nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { select { case <-time.After(feq): if err := kl.updateNodeStatus(); err != nil { @@ -542,7 +542,7 @@ func (kl *Kubelet) syncNodeStatus() { } for { select { - case <-time.After(kl.statusUpdateFrequency): + case <-time.After(nodeStatusUpdateFrequency): if err := kl.updateNodeStatus(); err != nil { glog.Errorf("Unable to update node status: %v", err) } From b51d491f05dfabb5c55273dc16b727d1c2cca8c0 Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Fri, 20 Mar 2015 13:35:41 -0400 Subject: [PATCH 3/4] Delete all pods based on condition transition time. --- .../controller/nodecontroller.go | 114 ++++++----- .../controller/nodecontroller_test.go | 185 +++++++++++++++--- pkg/kubelet/kubelet.go | 19 +- pkg/kubelet/kubelet_test.go | 38 ++-- 4 files changed, 262 insertions(+), 94 deletions(-) diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 7a3d545cfac..5c4017767ae 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -39,8 +39,18 @@ const ( // sync node status in this case, but will monitor node status updated from kubelet. If // it doesn't receive update for this amount of time, it will start posting node NotReady // condition. The amount of time when NodeController start evicting pods is controlled - // via flag 'pod_eviction_timeout'. Note: be cautious when changing nodeMonitorGracePeriod, - // it must work with kubelet.nodeStatusUpdateFrequency. + // via flag 'pod_eviction_timeout'. + // Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency + // in kubelet. There are several constraints: + // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where + // N means number of retries allowed for kubelet to post node status. It is pointless + // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there + // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes + // longer for user to see up-to-date node status. + // 3. nodeStatusUpdateFrequency needs to be large enough for Kubelet to generate node + // status. Kubelet may fail to update node status reliablly if the value is too small, + // as it takes time to gather all necessary node information. nodeMonitorGracePeriod = 8 * time.Second // The constant is used if sync_nodes_status=False, and for node startup. When node // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. @@ -94,15 +104,15 @@ func NewNodeController( } // Run creates initial node list and start syncing instances from cloudprovider, if any. -// It also starts syncing cluster node status. +// It also starts syncing or monitoring cluster node status. // 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider // or from command line flag). To make cluster bootstrap faster, node controller populates // node addresses. // 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. // Node created here will only have specs. -// 3. SyncNodeStatus() is called periodically (if enabled) to sync node status for nodes in -// k8s cluster. If not enabled, MonitorNodeStatus() is called otherwise to monitor node -// status posted from kubelet. +// 3. Depending on how k8s is configured, there are two ways of syncing the node status: +// 3.1 SyncProbedNodeStatus() is called periodically to sync node status for nodes in k8s cluster. +// 3.2 MonitorNodeStatus() is called periodically to monitor node status posted from kubelet. func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList @@ -139,7 +149,7 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus // Start syncing or monitoring node status. if syncNodeStatus { go util.Forever(func() { - if err = s.SyncNodeStatus(); err != nil { + if err = s.SyncProbedNodeStatus(); err != nil { glog.Errorf("Error syncing status: %v", err) } }, period) @@ -227,8 +237,8 @@ func (s *NodeController) SyncCloudNodes() error { return nil } -// SyncNodeStatus synchronizes cluster nodes status to master server. -func (s *NodeController) SyncNodeStatus() error { +// SyncProbedNodeStatus synchronizes cluster nodes status to master server. +func (s *NodeController) SyncProbedNodeStatus() error { nodes, err := s.kubeClient.Nodes().List() if err != nil { return err @@ -415,45 +425,70 @@ func (s *NodeController) MonitorNodeStatus() error { return err } for i := range nodes.Items { - node := &nodes.Items[i] - // Precompute all condition times to avoid deep copy of node status (We'll modify node for - // updating, and NodeStatus.Conditions is an array, which makes assignment copy not useful). - latestConditionTime := s.latestConditionTime(node, api.NodeReady) var gracePeriod time.Duration - if latestConditionTime == node.CreationTimestamp { + var lastReadyCondition api.NodeCondition + node := &nodes.Items[i] + readyCondition := s.getCondition(node, api.NodeReady) + if readyCondition == nil { + // If ready condition is nil, then kubelet (or nodecontroller) never posted node status. + // A fake ready condition is created, where LastProbeTime and LastTransitionTime is set + // to node.CreationTimestamp to avoid handle the corner case. + lastReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastProbeTime: node.CreationTimestamp, + LastTransitionTime: node.CreationTimestamp, + } gracePeriod = nodeStartupGracePeriod } else { + // If ready condition is not nil, make a copy of it, since we may modify it in place later. + lastReadyCondition = *readyCondition gracePeriod = nodeMonitorGracePeriod } - latestFullConditionTime := s.latestConditionTimeWithStatus(node, api.NodeReady, api.ConditionFull) - // Grace period has passed, post node NotReady condition to master, without contacting kubelet. - if util.Now().After(latestConditionTime.Add(gracePeriod)) { - readyCondition := s.getCondition(node, api.NodeReady) + + // Check last time when NodeReady was updated. + if util.Now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) { + // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown + // (regardless of its current value) in the master, without contacting kubelet. if readyCondition == nil { + glog.V(2).Infof("node %v is never updated by kubelet") node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ Type: api.NodeReady, - Status: api.ConditionNone, + Status: api.ConditionUnknown, Reason: fmt.Sprintf("Kubelet never posted node status"), LastProbeTime: util.Now(), LastTransitionTime: util.Now(), }) } else { - readyCondition.Status = api.ConditionNone - readyCondition.Reason = fmt.Sprintf("Kubelet stop posting node status") + // Note here the out-dated condition can be the one posted by nodecontroller + // itself before. We keep posting the status to keep LastProbeTime fresh. + glog.V(2).Infof("node %v hasn't been updated for a while, last ready condition is %+v", node.Name, readyCondition) + readyCondition.Status = api.ConditionUnknown + readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status") readyCondition.LastProbeTime = util.Now() - if readyCondition.Status == api.ConditionFull { + if lastReadyCondition.Status != api.ConditionUnknown { readyCondition.LastTransitionTime = util.Now() } } - glog.V(2).Infof("updating node %v, whose status hasn't been updated by kubelet for a long time", node.Name) _, err = s.kubeClient.Nodes().Update(node) if err != nil { glog.Errorf("error updating node %s: %v", node.Name, err) } } - // Eviction timeout! Evict all pods on the unhealthy node. - if util.Now().After(latestFullConditionTime.Add(s.podEvictionTimeout)) { - s.deletePods(node.Name) + + if readyCondition != nil { + // Check eviction timeout. + if lastReadyCondition.Status == api.ConditionNone && + util.Now().After(lastReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { + // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. + s.deletePods(node.Name) + } + if lastReadyCondition.Status == api.ConditionUnknown && + util.Now().After(lastReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout-gracePeriod)) { + // Same as above. Note however, since condition unknown is posted by node controller, which means we + // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. + s.deletePods(node.Name) + } } } return nil @@ -555,30 +590,3 @@ func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeCond } return nil } - -// latestConditionTime returns the latest condition timestamp for the node, regardless of condition status. -// If nothing matches, the node creation timestamp will be returned. -func (s *NodeController) latestConditionTime(node *api.Node, conditionType api.NodeConditionType) util.Time { - readyTime := node.ObjectMeta.CreationTimestamp - for _, condition := range node.Status.Conditions { - if condition.Type == conditionType && - condition.LastProbeTime.After(readyTime.Time) { - readyTime = condition.LastProbeTime - } - } - return readyTime -} - -// latestConditionTimeWithStatus returns the latest condition timestamp for the node, with given condition status. -// If nothing matches, the node creation timestamp will be returned. -func (s *NodeController) latestConditionTimeWithStatus(node *api.Node, conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) util.Time { - readyTime := node.ObjectMeta.CreationTimestamp - for _, condition := range node.Status.Conditions { - if condition.Type == conditionType && - condition.Status == conditionStatus && - condition.LastProbeTime.After(readyTime.Time) { - readyTime = condition.LastProbeTime - } - } - return readyTime -} diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 573eb6fd861..6846ebfd2cf 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -688,7 +688,7 @@ func TestPopulateNodeAddresses(t *testing.T) { } } -func TestSyncNodeStatus(t *testing.T) { +func TestSyncProbedNodeStatus(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -755,7 +755,7 @@ func TestSyncNodeStatus(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) - if err := nodeController.SyncNodeStatus(); err != nil { + if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -779,7 +779,7 @@ func TestSyncNodeStatus(t *testing.T) { } // Second sync will also update the node. item.fakeNodeHandler.RequestCount = 0 - if err := nodeController.SyncNodeStatus(); err != nil { + if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -788,7 +788,7 @@ func TestSyncNodeStatus(t *testing.T) { } } -func TestSyncNodeStatusTransitionTime(t *testing.T) { +func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -870,7 +870,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } - if err := nodeController.SyncNodeStatus(); err != nil { + if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { @@ -894,7 +894,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { } } -func TestSyncNodeStatusEvictPods(t *testing.T) { +func TestSyncProbedNodeStatusEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -1032,7 +1032,7 @@ func TestSyncNodeStatusEvictPods(t *testing.T) { nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } - if err := nodeController.SyncNodeStatus(); err != nil { + if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { @@ -1044,11 +1044,159 @@ func TestSyncNodeStatusEvictPods(t *testing.T) { } } -func TestMonitorNodeStatus(t *testing.T) { +func TestMonitorNodeStatusEvictPods(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + expectedEvictPods bool + }{ + // Node created recently, with no status (happens only at cluster startup). + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Now(), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedEvictPods: false, + }, + // Node created long time ago, with not ready status updated by kubelet for a short time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionNone, + LastProbeTime: util.Now(), + LastTransitionTime: util.Now(), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedEvictPods: false, + }, + // Node created long time ago, with not ready status updated by kubelet for a long time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedEvictPods: true, + }, + // Node created long time ago, with unknown status updated by node controller for a short time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastProbeTime: util.Now(), + LastTransitionTime: util.Now(), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedEvictPods: false, + }, + // Node created long time ago, with unknown status updated by node controller for a long time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedEvictPods: true, + }, + } + + for _, item := range table { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + if err := nodeController.MonitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + podEvicted := false + for _, action := range item.fakeNodeHandler.Actions { + if action.Action == "delete-pod" { + podEvicted = true + } + } + if item.expectedEvictPods != podEvicted { + t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted) + } + } +} + +func TestMonitorNodeStatusUpdateStatus(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler expectedRequestCount int - expectedEvictPods bool expectedNodes []*api.Node }{ // Node created long time ago, with no status. @@ -1067,7 +1215,6 @@ func TestMonitorNodeStatus(t *testing.T) { }, }, expectedRequestCount: 2, // List+Update - expectedEvictPods: true, expectedNodes: []*api.Node{ { ObjectMeta: api.ObjectMeta{ @@ -1078,7 +1225,7 @@ func TestMonitorNodeStatus(t *testing.T) { Conditions: []api.NodeCondition{ { Type: api.NodeReady, - Status: api.ConditionNone, + Status: api.ConditionUnknown, Reason: fmt.Sprintf("Kubelet never posted node status"), LastProbeTime: util.Time{}, LastTransitionTime: util.Time{}, @@ -1104,7 +1251,6 @@ func TestMonitorNodeStatus(t *testing.T) { }, }, expectedRequestCount: 1, // List - expectedEvictPods: false, expectedNodes: nil, }, // Node created long time ago, with status updated long time ago. @@ -1133,7 +1279,6 @@ func TestMonitorNodeStatus(t *testing.T) { }, }, expectedRequestCount: 2, // List+Update - expectedEvictPods: true, expectedNodes: []*api.Node{ { ObjectMeta: api.ObjectMeta{ @@ -1144,8 +1289,8 @@ func TestMonitorNodeStatus(t *testing.T) { Conditions: []api.NodeCondition{ { Type: api.NodeReady, - Status: api.ConditionNone, - Reason: fmt.Sprintf("Kubelet stop posting node status"), + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("Kubelet stopped posting node status"), LastProbeTime: util.Time{}, LastTransitionTime: util.Time{}, }, @@ -1180,7 +1325,6 @@ func TestMonitorNodeStatus(t *testing.T) { }, }, expectedRequestCount: 1, // List - expectedEvictPods: false, expectedNodes: nil, }, } @@ -1209,15 +1353,6 @@ func TestMonitorNodeStatus(t *testing.T) { if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } - podEvicted := false - for _, action := range item.fakeNodeHandler.Actions { - if action.Action == "delete-pod" { - podEvicted = true - } - } - if item.expectedEvictPods != podEvicted { - t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted) - } } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 75e27f27868..08caf999e3c 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -75,8 +75,18 @@ const ( initialNodeStatusUpdateFrequency = 100 * time.Millisecond nodeStatusUpdateFrequencyInc = 500 * time.Millisecond - // Node status update frequency and retry count. Note: be cautious when changing nodeStatusUpdateFrequency, - // it must work with nodecontroller.nodeMonitorGracePeriod. + // Node status update frequency and retry count. + // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod + // in nodecontroller. There are several constraints: + // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where + // N means number of retries allowed for kubelet to post node status. It is pointless + // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there + // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes + // longer for user to see up-to-date node status. + // 3. nodeStatusUpdateFrequency needs to be large enough for Kubelet to generate node + // status. Kubelet may fail to update node status reliablly if the value is too small, + // as it takes time to gather all necessary node information. nodeStatusUpdateFrequency = 2 * time.Second nodeStatusUpdateRetry = 5 ) @@ -1837,20 +1847,23 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { node.Spec.Capacity = CapacityFromMachineInfo(info) } + currentTime := util.Now() newCondition := api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionFull, Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Now(), + LastProbeTime: currentTime, } updated := false for i := range node.Status.Conditions { if node.Status.Conditions[i].Type == api.NodeReady { + newCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime node.Status.Conditions[i] = newCondition updated = true } } if !updated { + newCondition.LastTransitionTime = currentTime node.Status.Conditions = append(node.Status.Conditions, newCondition) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f72140953cb..8c585854c71 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3102,10 +3102,11 @@ func TestUpdateNewNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Time{}, + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Time{}, + LastTransitionTime: util.Time{}, }, }, NodeInfo: api.NodeSystemInfo{ @@ -3128,7 +3129,11 @@ func TestUpdateNewNodeStatus(t *testing.T) { if updatedNode.Status.Conditions[0].LastProbeTime.IsZero() { t.Errorf("unexpected zero last probe timestamp") } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } updatedNode.Status.Conditions[0].LastProbeTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} if !reflect.DeepEqual(expectedNode, updatedNode) { t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) } @@ -3151,10 +3156,11 @@ func TestUpdateExistingNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, }, }, @@ -3173,10 +3179,11 @@ func TestUpdateExistingNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Time{}, // placeholder + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Time{}, // placeholder + LastTransitionTime: util.Time{}, // placeholder }, }, NodeInfo: api.NodeSystemInfo{ @@ -3196,11 +3203,16 @@ func TestUpdateExistingNodeStatus(t *testing.T) { if !ok { t.Errorf("unexpected object type") } + // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastProbeTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastProbeTime, + t.Errorf("expected \n%v\n, got \n%v", util.Now(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { + t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastTransitionTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) } updatedNode.Status.Conditions[0].LastProbeTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} if !reflect.DeepEqual(expectedNode, updatedNode) { t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) } From c5675b8924ace47762289bb4e6f1545ebca625f4 Mon Sep 17 00:00:00 2001 From: Deyuan Deng Date: Sun, 22 Mar 2015 21:10:35 -0400 Subject: [PATCH 4/4] Use fake time in nodecontroller unittest; rename receiver 's' to 'nc' --- .../app/controllermanager.go | 4 +- .../controller/nodecontroller.go | 197 ++++++------- .../controller/nodecontroller_test.go | 260 +++++++++--------- pkg/kubelet/kubelet.go | 10 +- 4 files changed, 238 insertions(+), 233 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 2c823b54b5d..a6d6b2c4f64 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -107,7 +107,9 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.") - fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controller send probes to kubelets and update NodeStatus.") + fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, ""+ + "If true, node controller sends probes to kubelet and updates NodeStatus."+ + "If false, Kubelet posts NodeStatus to API server.") // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 5c4017767ae..75db78f2411 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -37,22 +37,20 @@ import ( const ( // The constant is used if sync_nodes_status=False. NodeController will not proactively // sync node status in this case, but will monitor node status updated from kubelet. If - // it doesn't receive update for this amount of time, it will start posting node NotReady - // condition. The amount of time when NodeController start evicting pods is controlled - // via flag 'pod_eviction_timeout'. + // it doesn't receive update for this amount of time, it will start posting "NodeReady== + // ConditionUnknown". The amount of time before which NodeController start evicting pods + // is controlled via flag 'pod_eviction_timeout'. // Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency // in kubelet. There are several constraints: // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where // N means number of retries allowed for kubelet to post node status. It is pointless // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // The constant must be less than podEvictionTimeout. // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // longer for user to see up-to-date node status. - // 3. nodeStatusUpdateFrequency needs to be large enough for Kubelet to generate node - // status. Kubelet may fail to update node status reliablly if the value is too small, - // as it takes time to gather all necessary node information. nodeMonitorGracePeriod = 8 * time.Second - // The constant is used if sync_nodes_status=False, and for node startup. When node + // The constant is used if sync_nodes_status=False, only for node startup. When node // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. nodeStartupGracePeriod = 30 * time.Second // The constant is used if sync_nodes_status=False. It controls NodeController monitoring @@ -77,7 +75,9 @@ type NodeController struct { kubeletClient client.KubeletClient registerRetryCount int podEvictionTimeout time.Duration - lookupIP func(host string) ([]net.IP, error) + // Method for easy mocking in unittest. + lookupIP func(host string) ([]net.IP, error) + now func() util.Time } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -100,6 +100,7 @@ func NewNodeController( registerRetryCount: registerRetryCount, podEvictionTimeout: podEvictionTimeout, lookupIP: net.LookupIP, + now: util.Now, } } @@ -111,36 +112,38 @@ func NewNodeController( // 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. // Node created here will only have specs. // 3. Depending on how k8s is configured, there are two ways of syncing the node status: -// 3.1 SyncProbedNodeStatus() is called periodically to sync node status for nodes in k8s cluster. -// 3.2 MonitorNodeStatus() is called periodically to monitor node status posted from kubelet. -func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { +// 3.1 SyncProbedNodeStatus() is called periodically to trigger master to probe kubelet, +// and incorporate the resulting node status. +// 3.2 MonitorNodeStatus() is called periodically to incorporate the results of node status +// pushed from kubelet to master. +func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error - if s.isRunningCloudProvider() { + if nc.isRunningCloudProvider() { if syncNodeList { - if nodes, err = s.GetCloudNodesWithSpec(); err != nil { + if nodes, err = nc.GetCloudNodesWithSpec(); err != nil { glog.Errorf("Error loading initial node from cloudprovider: %v", err) } } else { nodes = &api.NodeList{} } } else { - if nodes, err = s.GetStaticNodesWithSpec(); err != nil { + if nodes, err = nc.GetStaticNodesWithSpec(); err != nil { glog.Errorf("Error loading initial static nodes: %v", err) } } - if nodes, err = s.PopulateAddresses(nodes); err != nil { + if nodes, err = nc.PopulateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } - if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { + if err = nc.RegisterNodes(nodes, nc.registerRetryCount, period); err != nil { glog.Errorf("Error registering node list %+v: %v", nodes, err) } // Start syncing node list from cloudprovider. - if syncNodeList && s.isRunningCloudProvider() { + if syncNodeList && nc.isRunningCloudProvider() { go util.Forever(func() { - if err = s.SyncCloudNodes(); err != nil { + if err = nc.SyncCloudNodes(); err != nil { glog.Errorf("Error syncing cloud: %v", err) } }, period) @@ -149,13 +152,13 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus // Start syncing or monitoring node status. if syncNodeStatus { go util.Forever(func() { - if err = s.SyncProbedNodeStatus(); err != nil { + if err = nc.SyncProbedNodeStatus(); err != nil { glog.Errorf("Error syncing status: %v", err) } }, period) } else { go util.Forever(func() { - if err = s.MonitorNodeStatus(); err != nil { + if err = nc.MonitorNodeStatus(); err != nil { glog.Errorf("Error monitoring node status: %v", err) } }, nodeMonitorPeriod) @@ -163,19 +166,19 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus } // RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times. -func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { +func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { if len(nodes.Items) == 0 { return nil } registered := util.NewStringSet() - nodes = s.canonicalizeName(nodes) + nodes = nc.canonicalizeName(nodes) for i := 0; i < retryCount; i++ { for _, node := range nodes.Items { if registered.Has(node.Name) { continue } - _, err := s.kubeClient.Nodes().Create(&node) + _, err := nc.kubeClient.Nodes().Create(&node) if err == nil || apierrors.IsAlreadyExists(err) { registered.Insert(node.Name) glog.Infof("Registered node in registry: %s", node.Name) @@ -197,12 +200,12 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr } // SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. -func (s *NodeController) SyncCloudNodes() error { - matches, err := s.GetCloudNodesWithSpec() +func (nc *NodeController) SyncCloudNodes() error { + matches, err := nc.GetCloudNodesWithSpec() if err != nil { return err } - nodes, err := s.kubeClient.Nodes().List() + nodes, err := nc.kubeClient.Nodes().List() if err != nil { return err } @@ -216,7 +219,7 @@ func (s *NodeController) SyncCloudNodes() error { for _, node := range matches.Items { if _, ok := nodeMap[node.Name]; !ok { glog.Infof("Create node in registry: %s", node.Name) - _, err = s.kubeClient.Nodes().Create(&node) + _, err = nc.kubeClient.Nodes().Create(&node) if err != nil { glog.Errorf("Create node %s error: %v", node.Name, err) } @@ -227,23 +230,23 @@ func (s *NodeController) SyncCloudNodes() error { // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. for nodeID := range nodeMap { glog.Infof("Delete node from registry: %s", nodeID) - err = s.kubeClient.Nodes().Delete(nodeID) + err = nc.kubeClient.Nodes().Delete(nodeID) if err != nil { glog.Errorf("Delete node %s error: %v", nodeID, err) } - s.deletePods(nodeID) + nc.deletePods(nodeID) } return nil } // SyncProbedNodeStatus synchronizes cluster nodes status to master server. -func (s *NodeController) SyncProbedNodeStatus() error { - nodes, err := s.kubeClient.Nodes().List() +func (nc *NodeController) SyncProbedNodeStatus() error { + nodes, err := nc.kubeClient.Nodes().List() if err != nil { return err } - nodes, err = s.PopulateNodesStatus(nodes) + nodes, err = nc.PopulateNodesStatus(nodes) if err != nil { return err } @@ -252,7 +255,7 @@ func (s *NodeController) SyncProbedNodeStatus() error { // useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will // differ in every call of the sync loop. glog.V(2).Infof("updating node %v", node.Name) - _, err = s.kubeClient.Nodes().Update(&node) + _, err = nc.kubeClient.Nodes().Update(&node) if err != nil { glog.Errorf("error updating node %s: %v", node.Name, err) } @@ -261,25 +264,25 @@ func (s *NodeController) SyncProbedNodeStatus() error { } // PopulateNodesStatus populates node status for given list of nodes. -func (s *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) { +func (nc *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) { var wg sync.WaitGroup wg.Add(len(nodes.Items)) for i := range nodes.Items { go func(node *api.Node) { - node.Status.Conditions = s.DoCheck(node) - if err := s.populateNodeInfo(node); err != nil { + node.Status.Conditions = nc.DoCheck(node) + if err := nc.populateNodeInfo(node); err != nil { glog.Errorf("Can't collect information for node %s: %v", node.Name, err) } wg.Done() }(&nodes.Items[i]) } wg.Wait() - return s.PopulateAddresses(nodes) + return nc.PopulateAddresses(nodes) } // populateNodeInfo gets node info from kubelet and update the node. -func (s *NodeController) populateNodeInfo(node *api.Node) error { - nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name) +func (nc *NodeController) populateNodeInfo(node *api.Node) error { + nodeInfo, err := nc.kubeletClient.GetNodeInfo(node.Name) if err != nil { return err } @@ -291,96 +294,96 @@ func (s *NodeController) populateNodeInfo(node *api.Node) error { } // DoCheck performs various condition checks for given node. -func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { +func (nc *NodeController) DoCheck(node *api.Node) []api.NodeCondition { var conditions []api.NodeCondition // Check Condition: NodeReady. TODO: More node conditions. - oldReadyCondition := s.getCondition(node, api.NodeReady) - newReadyCondition := s.checkNodeReady(node) - s.updateLastTransitionTime(oldReadyCondition, newReadyCondition) + oldReadyCondition := nc.getCondition(node, api.NodeReady) + newReadyCondition := nc.checkNodeReady(node) + nc.updateLastTransitionTime(oldReadyCondition, newReadyCondition) if newReadyCondition.Status != api.ConditionFull { // Node is not ready for this probe, we need to check if pods need to be deleted. - if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { + if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) { // As long as the node fails, we call delete pods to delete all pods. Node controller sync // is not a closed loop process, there is no feedback from other components regarding pod // status. Keep listing pods to sanity check if pods are all deleted makes more sense. - s.deletePods(node.Name) + nc.deletePods(node.Name) } } conditions = append(conditions, *newReadyCondition) // Check Condition: NodeSchedulable - oldSchedulableCondition := s.getCondition(node, api.NodeSchedulable) - newSchedulableCondition := s.checkNodeSchedulable(node) - s.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition) + oldSchedulableCondition := nc.getCondition(node, api.NodeSchedulable) + newSchedulableCondition := nc.checkNodeSchedulable(node) + nc.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition) conditions = append(conditions, *newSchedulableCondition) return conditions } // updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition. -func (s *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) { +func (nc *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) { if oldCondition != nil && oldCondition.Status == newCondition.Status { // If node status doesn't change, transition time is same as last time. newCondition.LastTransitionTime = oldCondition.LastTransitionTime } else { // Set transition time to Now() if node status changes or `oldCondition` is nil, which // happens only when the node is checked for the first time. - newCondition.LastTransitionTime = util.Now() + newCondition.LastTransitionTime = nc.now() } } // checkNodeSchedulable checks node schedulable condition, without transition timestamp set. -func (s *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition { +func (nc *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition { if node.Spec.Unschedulable { return &api.NodeCondition{ Type: api.NodeSchedulable, Status: api.ConditionNone, Reason: "User marked unschedulable during node create/update", - LastProbeTime: util.Now(), + LastProbeTime: nc.now(), } } else { return &api.NodeCondition{ Type: api.NodeSchedulable, Status: api.ConditionFull, Reason: "Node is schedulable by default", - LastProbeTime: util.Now(), + LastProbeTime: nc.now(), } } } // checkNodeReady checks raw node ready condition, without transition timestamp set. -func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { - switch status, err := s.kubeletClient.HealthCheck(node.Name); { +func (nc *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { + switch status, err := nc.kubeletClient.HealthCheck(node.Name); { case err != nil: glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err) return &api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionUnknown, Reason: fmt.Sprintf("Node health check error: %v", err), - LastProbeTime: util.Now(), + LastProbeTime: nc.now(), } case status == probe.Failure: return &api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionNone, Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"), - LastProbeTime: util.Now(), + LastProbeTime: nc.now(), } default: return &api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionFull, Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"), - LastProbeTime: util.Now(), + LastProbeTime: nc.now(), } } } // PopulateAddresses queries Address for given list of nodes. -func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { - if s.isRunningCloudProvider() { - instances, ok := s.cloud.Instances() +func (nc *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { + if nc.isRunningCloudProvider() { + instances, ok := nc.cloud.Instances() if !ok { return nodes, ErrCloudInstance } @@ -401,7 +404,7 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} node.Status.Addresses = []api.NodeAddress{address} } else { - addrs, err := s.lookupIP(node.Name) + addrs, err := nc.lookupIP(node.Name) if err != nil { glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) } else if len(addrs) == 0 { @@ -416,11 +419,11 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, return nodes, nil } -// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if -// not, post node NotReady status. It also evicts all pods if node is not ready for -// a long period of time. -func (s *NodeController) MonitorNodeStatus() error { - nodes, err := s.kubeClient.Nodes().List() +// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if not, +// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or +// not reachable for a long period of time. +func (nc *NodeController) MonitorNodeStatus() error { + nodes, err := nc.kubeClient.Nodes().List() if err != nil { return err } @@ -428,7 +431,7 @@ func (s *NodeController) MonitorNodeStatus() error { var gracePeriod time.Duration var lastReadyCondition api.NodeCondition node := &nodes.Items[i] - readyCondition := s.getCondition(node, api.NodeReady) + readyCondition := nc.getCondition(node, api.NodeReady) if readyCondition == nil { // If ready condition is nil, then kubelet (or nodecontroller) never posted node status. // A fake ready condition is created, where LastProbeTime and LastTransitionTime is set @@ -447,7 +450,7 @@ func (s *NodeController) MonitorNodeStatus() error { } // Check last time when NodeReady was updated. - if util.Now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) { + if nc.now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) { // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown // (regardless of its current value) in the master, without contacting kubelet. if readyCondition == nil { @@ -456,21 +459,21 @@ func (s *NodeController) MonitorNodeStatus() error { Type: api.NodeReady, Status: api.ConditionUnknown, Reason: fmt.Sprintf("Kubelet never posted node status"), - LastProbeTime: util.Now(), - LastTransitionTime: util.Now(), + LastProbeTime: node.CreationTimestamp, + LastTransitionTime: nc.now(), }) } else { - // Note here the out-dated condition can be the one posted by nodecontroller - // itself before. We keep posting the status to keep LastProbeTime fresh. - glog.V(2).Infof("node %v hasn't been updated for a while, last ready condition is %+v", node.Name, readyCondition) - readyCondition.Status = api.ConditionUnknown - readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status") - readyCondition.LastProbeTime = util.Now() + glog.V(2).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v", + node.Name, nc.now().Time.Sub(lastReadyCondition.LastProbeTime.Time), lastReadyCondition) if lastReadyCondition.Status != api.ConditionUnknown { - readyCondition.LastTransitionTime = util.Now() + readyCondition.Status = api.ConditionUnknown + readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status") + // LastProbeTime is the last time we heard from kubelet. + readyCondition.LastProbeTime = lastReadyCondition.LastProbeTime + readyCondition.LastTransitionTime = nc.now() } } - _, err = s.kubeClient.Nodes().Update(node) + _, err = nc.kubeClient.Nodes().Update(node) if err != nil { glog.Errorf("error updating node %s: %v", node.Name, err) } @@ -479,15 +482,15 @@ func (s *NodeController) MonitorNodeStatus() error { if readyCondition != nil { // Check eviction timeout. if lastReadyCondition.Status == api.ConditionNone && - util.Now().After(lastReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { + nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) { // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. - s.deletePods(node.Name) + nc.deletePods(node.Name) } if lastReadyCondition.Status == api.ConditionUnknown && - util.Now().After(lastReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout-gracePeriod)) { + nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) { // Same as above. Note however, since condition unknown is posted by node controller, which means we // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. - s.deletePods(node.Name) + nc.deletePods(node.Name) } } } @@ -497,12 +500,12 @@ func (s *NodeController) MonitorNodeStatus() error { // GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error // occurs, an empty NodeList will be returned with a non-nil error info. The method only // constructs spec fields for nodes. -func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { +func (nc *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} - for _, nodeID := range s.nodes { + for _, nodeID := range nc.nodes { node := api.Node{ ObjectMeta: api.ObjectMeta{Name: nodeID}, - Spec: api.NodeSpec{Capacity: s.staticResources.Capacity}, + Spec: api.NodeSpec{Capacity: nc.staticResources.Capacity}, } result.Items = append(result.Items, node) } @@ -512,13 +515,13 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { // GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error // occurs, an empty NodeList will be returned with a non-nil error info. The method only // constructs spec fields for nodes. -func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { +func (nc *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} - instances, ok := s.cloud.Instances() + instances, ok := nc.cloud.Instances() if !ok { return result, ErrCloudInstance } - matches, err := instances.List(s.matchRE) + matches, err := instances.List(nc.matchRE) if err != nil { return result, err } @@ -530,7 +533,7 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { return nil, err } if resources == nil { - resources = s.staticResources + resources = nc.staticResources } if resources != nil { node.Spec.Capacity = resources.Capacity @@ -547,10 +550,10 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { } // deletePods will delete all pods from master running on given node. -func (s *NodeController) deletePods(nodeID string) error { +func (nc *NodeController) deletePods(nodeID string) error { glog.V(2).Infof("Delete all pods from %v", nodeID) // TODO: We don't yet have field selectors from client, see issue #1362. - pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) if err != nil { return err } @@ -559,7 +562,7 @@ func (s *NodeController) deletePods(nodeID string) error { continue } glog.V(2).Infof("Delete pod %v", pod.Name) - if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { + if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { glog.Errorf("Error deleting pod %v: %v", pod.Name, err) } } @@ -568,12 +571,12 @@ func (s *NodeController) deletePods(nodeID string) error { } // isRunningCloudProvider checks if cluster is running with cloud provider. -func (s *NodeController) isRunningCloudProvider() bool { - return s.cloud != nil && len(s.matchRE) > 0 +func (nc *NodeController) isRunningCloudProvider() bool { + return nc.cloud != nil && len(nc.matchRE) > 0 } // canonicalizeName takes a node list and lowercases all nodes' name. -func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { +func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { for i := range nodes.Items { nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name) } @@ -582,7 +585,7 @@ func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { // getCondition returns a condition object for the specific condition // type, nil if the condition is not set. -func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition { +func (nc *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition { for i := range node.Status.Conditions { if node.Status.Conditions[i].Type == conditionType { return &node.Status.Conditions[i] diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 6846ebfd2cf..ab731446580 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -563,6 +563,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) { } func TestNodeConditionsCheck(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { node *api.Node fakeKubeletClient *FakeKubeletClient @@ -578,14 +579,18 @@ func TestNodeConditionsCheck(t *testing.T) { }, expectedConditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -599,14 +604,18 @@ func TestNodeConditionsCheck(t *testing.T) { }, expectedConditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionNone, - Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -620,14 +629,18 @@ func TestNodeConditionsCheck(t *testing.T) { }, expectedConditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionUnknown, - Reason: "Node health check error: Error", + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: "Node health check error: Error", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionNone, - Reason: "User marked unschedulable during node create/update", + Type: api.NodeSchedulable, + Status: api.ConditionNone, + Reason: "User marked unschedulable during node create/update", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -635,17 +648,8 @@ func TestNodeConditionsCheck(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) + nodeController.now = func() util.Time { return fakeNow } conditions := nodeController.DoCheck(item.node) - for i := range conditions { - if conditions[i].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - if conditions[i].LastProbeTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - conditions[i].LastTransitionTime = util.Time{} - conditions[i].LastProbeTime = util.Time{} - } if !reflect.DeepEqual(item.expectedConditions, conditions) { t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions) } @@ -689,6 +693,7 @@ func TestPopulateNodeAddresses(t *testing.T) { } func TestSyncProbedNodeStatus(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -713,14 +718,18 @@ func TestSyncProbedNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, Addresses: []api.NodeAddress{ @@ -733,14 +742,18 @@ func TestSyncProbedNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, Addresses: []api.NodeAddress{ @@ -755,25 +768,13 @@ func TestSyncProbedNodeStatus(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) + nodeController.now = func() util.Time { return fakeNow } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } - for i := range item.fakeNodeHandler.UpdatedNodes { - conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions - for j := range conditions { - if conditions[j].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - if conditions[j].LastProbeTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - conditions[j].LastTransitionTime = util.Time{} - conditions[j].LastProbeTime = util.Time{} - } - } if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } @@ -789,15 +790,17 @@ func TestSyncProbedNodeStatus(t *testing.T) { } func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeKubeletClient *FakeKubeletClient - expectedRequestCount int - expectedTransitionTimeChange bool + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + expectedRequestCount int + expectedTransitionTime util.Time }{ { // Existing node is healthy, current probe is healthy too. // Existing node is schedulable, again explicitly mark node as schedulable. + // Expect transition time to stay the same as before. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { @@ -826,12 +829,13 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { Status: probe.Success, Err: nil, }, - expectedRequestCount: 2, // List+Update - expectedTransitionTimeChange: false, + expectedRequestCount: 2, // List+Update + expectedTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { // Existing node is healthy, current probe is unhealthy. // Existing node is schedulable, mark node as unschedulable. + // Expect transition time to be now. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { @@ -860,34 +864,25 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { Status: probe.Failure, Err: nil, }, - expectedRequestCount: 2, // List+Update - expectedTransitionTimeChange: true, + expectedRequestCount: 2, // List+Update + expectedTransitionTime: fakeNow, }, } for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) - nodeController.lookupIP = func(host string) ([]net.IP, error) { - return nil, fmt.Errorf("lookup %v: no such host", host) - } + nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } + nodeController.now = func() util.Time { return fakeNow } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } - for i := range item.fakeNodeHandler.UpdatedNodes { - conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions - for j := range conditions { - condition := conditions[j] - if item.expectedTransitionTimeChange { - if !condition.LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime) - } - } else { - if !condition.LastTransitionTime.Equal(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime) - } + for _, node := range item.fakeNodeHandler.UpdatedNodes { + for _, condition := range node.Status.Conditions { + if !condition.LastTransitionTime.Time.Equal(item.expectedTransitionTime.Time) { + t.Errorf("expected last transition time %v, but got %v", item.expectedTransitionTime, condition.LastTransitionTime) } } } @@ -1029,9 +1024,7 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) - nodeController.lookupIP = func(host string) ([]net.IP, error) { - return nil, fmt.Errorf("lookup %v: no such host", host) - } + nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -1045,9 +1038,11 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) { } func TestMonitorNodeStatusEvictPods(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { fakeNodeHandler *FakeNodeHandler expectedEvictPods bool + evictionTimeout time.Duration }{ // Node created recently, with no status (happens only at cluster startup). { @@ -1056,7 +1051,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { { ObjectMeta: api.ObjectMeta{ Name: "node0", - CreationTimestamp: util.Now(), + CreationTimestamp: fakeNow, }, }, }, @@ -1064,9 +1059,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, }, }, + evictionTimeout: 30 * time.Minute, expectedEvictPods: false, }, - // Node created long time ago, with not ready status updated by kubelet for a short time. + // Node created long time ago, and kubelet posted NotReady for a short period of time. { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1078,10 +1074,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionNone, - LastProbeTime: util.Now(), - LastTransitionTime: util.Now(), + Type: api.NodeReady, + Status: api.ConditionNone, + // Node status has just been updated, and transited to NotReady for 10min. + LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC), }, }, }, @@ -1091,9 +1088,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, }, }, + evictionTimeout: 30 * time.Minute, expectedEvictPods: false, }, - // Node created long time ago, with not ready status updated by kubelet for a long time. + // Node created long time ago, and kubelet posted NotReady for a long period of time. { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1105,10 +1103,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionUnknown, - LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Type: api.NodeReady, + Status: api.ConditionNone, + // Node status has just been updated, and transited to NotReady for 1hr. + LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), }, }, }, @@ -1118,9 +1117,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, }, }, + evictionTimeout: 30 * time.Minute, expectedEvictPods: true, }, - // Node created long time ago, with unknown status updated by node controller for a short time. + // Node created long time ago, node controller posted Unknown for a short period of time. { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1132,10 +1132,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionUnknown, - LastProbeTime: util.Now(), - LastTransitionTime: util.Now(), + Type: api.NodeReady, + Status: api.ConditionUnknown, + // Node status was updated by nodecontroller 10min ago + LastProbeTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC), }, }, }, @@ -1145,9 +1146,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, }, }, + evictionTimeout: 30 * time.Minute, expectedEvictPods: false, }, - // Node created long time ago, with unknown status updated by node controller for a long time. + // Node created long time ago, node controller posted Unknown for a long period of time. { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1159,10 +1161,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionUnknown, - LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), - LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Type: api.NodeReady, + Status: api.ConditionUnknown, + // Node status was updated by nodecontroller 1hr ago + LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), }, }, }, @@ -1172,12 +1175,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, }, }, + evictionTimeout: 30 * time.Minute, expectedEvictPods: true, }, } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout) + nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -1194,12 +1199,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { } func TestMonitorNodeStatusUpdateStatus(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { fakeNodeHandler *FakeNodeHandler expectedRequestCount int expectedNodes []*api.Node }{ - // Node created long time ago, with no status. + // Node created long time ago, without status: + // Expect Unknown status posted from node controller. { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1227,22 +1234,23 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { Type: api.NodeReady, Status: api.ConditionUnknown, Reason: fmt.Sprintf("Kubelet never posted node status"), - LastProbeTime: util.Time{}, - LastTransitionTime: util.Time{}, + LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: fakeNow, }, }, }, }, }, }, - // Node created recently, with no status. + // Node created recently, without status. + // Expect no action from node controller (within startup grace period). { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { ObjectMeta: api.ObjectMeta{ Name: "node0", - CreationTimestamp: util.Now(), + CreationTimestamp: fakeNow, }, }, }, @@ -1253,7 +1261,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { expectedRequestCount: 1, // List expectedNodes: nil, }, - // Node created long time ago, with status updated long time ago. + // Node created long time ago, with status updated by kubelet exceeds grace period. + // Expect Unknown status posted from node controller. { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1265,10 +1274,11 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), + Type: api.NodeReady, + Status: api.ConditionFull, + // Node status hasn't been updated for 1hr. + LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), }, }, }, @@ -1291,8 +1301,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { Type: api.NodeReady, Status: api.ConditionUnknown, Reason: fmt.Sprintf("Kubelet stopped posting node status"), - LastProbeTime: util.Time{}, - LastTransitionTime: util.Time{}, + LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + LastTransitionTime: fakeNow, }, }, }, @@ -1300,6 +1310,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { }, }, // Node created long time ago, with status updated recently. + // Expect no action from node controller (within monitor grace period). { fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ @@ -1311,10 +1322,11 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Now(), - LastTransitionTime: util.Time{}, + Type: api.NodeReady, + Status: api.ConditionFull, + // Node status has just been updated. + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -1331,25 +1343,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + nodeController.now = func() util.Time { return fakeNow } if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } - for i := range item.fakeNodeHandler.UpdatedNodes { - conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions - for j := range conditions { - if conditions[j].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - if conditions[j].LastProbeTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - conditions[j].LastTransitionTime = util.Time{} - conditions[j].LastProbeTime = util.Time{} - } - } if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 08caf999e3c..6fb8ac8f3fb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -75,20 +75,20 @@ const ( initialNodeStatusUpdateFrequency = 100 * time.Millisecond nodeStatusUpdateFrequencyInc = 500 * time.Millisecond - // Node status update frequency and retry count. + // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod // in nodecontroller. There are several constraints: // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where // N means number of retries allowed for kubelet to post node status. It is pointless // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. - // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes - // longer for user to see up-to-date node status. - // 3. nodeStatusUpdateFrequency needs to be large enough for Kubelet to generate node + // The constant must be less than podEvictionTimeout. + // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node // status. Kubelet may fail to update node status reliablly if the value is too small, // as it takes time to gather all necessary node information. nodeStatusUpdateFrequency = 2 * time.Second - nodeStatusUpdateRetry = 5 + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. + nodeStatusUpdateRetry = 5 ) var (