diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 53371a31de4..a6d6b2c4f64 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -107,7 +107,9 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.") - fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.") + fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, ""+ + "If true, node controller sends probes to kubelet and updates NodeStatus."+ + "If false, Kubelet posts NodeStatus to API server.") // TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: in the meantime, use resource.QuantityFlag() instead of these fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 9bc62e25d5d..745d61b3d11 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -51,7 +51,6 @@ type KubeletServer struct { SyncFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration - StatusUpdateFrequency time.Duration ManifestURL string EnableServer bool Address util.IP @@ -85,13 +84,12 @@ type KubeletServer struct { // NewKubeletServer will create a new KubeletServer with default values. func NewKubeletServer() *KubeletServer { return &KubeletServer{ - SyncFrequency: 10 * time.Second, - FileCheckFrequency: 20 * time.Second, - HTTPCheckFrequency: 20 * time.Second, - StatusUpdateFrequency: 20 * time.Second, - EnableServer: true, - Address: util.IP(net.ParseIP("127.0.0.1")), - Port: ports.KubeletPort, + SyncFrequency: 10 * time.Second, + FileCheckFrequency: 20 * time.Second, + HTTPCheckFrequency: 20 * time.Second, + EnableServer: true, + Address: util.IP(net.ParseIP("127.0.0.1")), + Port: ports.KubeletPort, PodInfraContainerImage: kubelet.PodInfraContainerImage, RootDirectory: defaultRootDir, RegistryBurst: 10, @@ -112,7 +110,6 @@ func NewKubeletServer() *KubeletServer { func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.Config, "config", s.Config, "Path to the config file or directory of files") fs.DurationVar(&s.SyncFrequency, "sync_frequency", s.SyncFrequency, "Max period between synchronizing running containers and config") - fs.DurationVar(&s.StatusUpdateFrequency, "status_update_frequency", s.StatusUpdateFrequency, "Duration between posting node status to master") fs.DurationVar(&s.FileCheckFrequency, "file_check_frequency", s.FileCheckFrequency, "Duration between checking config files for new data") fs.DurationVar(&s.HTTPCheckFrequency, "http_check_frequency", s.HTTPCheckFrequency, "Duration between checking http for new data") fs.StringVar(&s.ManifestURL, "manifest_url", s.ManifestURL, "URL for accessing the container manifest") @@ -179,7 +176,6 @@ func (s *KubeletServer) Run(_ []string) error { RootDirectory: s.RootDirectory, ConfigFile: s.Config, ManifestURL: s.ManifestURL, - StatusUpdateFrequency: s.StatusUpdateFrequency, FileCheckFrequency: s.FileCheckFrequency, HTTPCheckFrequency: s.HTTPCheckFrequency, PodInfraContainerImage: s.PodInfraContainerImage, @@ -285,7 +281,6 @@ func SimpleKubelet(client *client.Client, EnableDebuggingHandlers: true, HTTPCheckFrequency: 1 * time.Second, FileCheckFrequency: 1 * time.Second, - StatusUpdateFrequency: 3 * time.Second, SyncFrequency: 3 * time.Second, MinimumGCAge: 10 * time.Second, MaxPerPodContainerCount: 5, @@ -380,7 +375,6 @@ type KubeletConfig struct { RootDirectory string ConfigFile string ManifestURL string - StatusUpdateFrequency time.Duration FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string @@ -446,7 +440,6 @@ func createAndInitKubelet(kc *KubeletConfig, pc *config.PodConfig) (*kubelet.Kub kc.StreamingConnectionIdleTimeout, kc.Recorder, kc.CadvisorInterface, - kc.StatusUpdateFrequency, kc.ImageGCPolicy) if err != nil { diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index 696a1f22083..75db78f2411 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -34,6 +34,32 @@ import ( "github.com/golang/glog" ) +const ( + // The constant is used if sync_nodes_status=False. NodeController will not proactively + // sync node status in this case, but will monitor node status updated from kubelet. If + // it doesn't receive update for this amount of time, it will start posting "NodeReady== + // ConditionUnknown". The amount of time before which NodeController start evicting pods + // is controlled via flag 'pod_eviction_timeout'. + // Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency + // in kubelet. There are several constraints: + // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where + // N means number of retries allowed for kubelet to post node status. It is pointless + // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there + // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // The constant must be less than podEvictionTimeout. + // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes + // longer for user to see up-to-date node status. + nodeMonitorGracePeriod = 8 * time.Second + // The constant is used if sync_nodes_status=False, only for node startup. When node + // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. + nodeStartupGracePeriod = 30 * time.Second + // The constant is used if sync_nodes_status=False. It controls NodeController monitoring + // period, i.e. how often does NodeController check node status posted from kubelet. + // Theoretically, this value should be lower than nodeMonitorGracePeriod. + // TODO: Change node status monitor to watch based. + nodeMonitorPeriod = 5 * time.Second +) + var ( ErrRegistration = errors.New("unable to register all nodes.") ErrQueryIPAddress = errors.New("unable to query IP address.") @@ -49,12 +75,12 @@ type NodeController struct { kubeletClient client.KubeletClient registerRetryCount int podEvictionTimeout time.Duration - lookupIP func(host string) ([]net.IP, error) + // Method for easy mocking in unittest. + lookupIP func(host string) ([]net.IP, error) + now func() util.Time } // NewNodeController returns a new node controller to sync instances from cloudprovider. -// TODO: NodeController health checker should be a separate package other than -// kubeletclient, node health check != kubelet health check. func NewNodeController( cloud cloudprovider.Interface, matchRE string, @@ -74,85 +100,85 @@ func NewNodeController( registerRetryCount: registerRetryCount, podEvictionTimeout: podEvictionTimeout, lookupIP: net.LookupIP, + now: util.Now, } } -// Run creates initial node list and start syncing instances from cloudprovider if any. -// It also starts syncing cluster node status. +// Run creates initial node list and start syncing instances from cloudprovider, if any. +// It also starts syncing or monitoring cluster node status. // 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider // or from command line flag). To make cluster bootstrap faster, node controller populates // node addresses. -// 2. SyncCloud() is called periodically (if enabled) to sync instances from cloudprovider. +// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. // Node created here will only have specs. -// 3. SyncNodeStatus() is called periodically (if enabled) to sync node status for nodes in -// k8s cluster. -func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { +// 3. Depending on how k8s is configured, there are two ways of syncing the node status: +// 3.1 SyncProbedNodeStatus() is called periodically to trigger master to probe kubelet, +// and incorporate the resulting node status. +// 3.2 MonitorNodeStatus() is called periodically to incorporate the results of node status +// pushed from kubelet to master. +func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error - if s.isRunningCloudProvider() { + if nc.isRunningCloudProvider() { if syncNodeList { - nodes, err = s.GetCloudNodesWithSpec() - if err != nil { + if nodes, err = nc.GetCloudNodesWithSpec(); err != nil { glog.Errorf("Error loading initial node from cloudprovider: %v", err) } } else { nodes = &api.NodeList{} } } else { - nodes, err = s.GetStaticNodesWithSpec() - if err != nil { + if nodes, err = nc.GetStaticNodesWithSpec(); err != nil { glog.Errorf("Error loading initial static nodes: %v", err) } } - nodes, err = s.PopulateAddresses(nodes) - if err != nil { + if nodes, err = nc.PopulateAddresses(nodes); err != nil { glog.Errorf("Error getting nodes ips: %v", err) } - if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { + if err = nc.RegisterNodes(nodes, nc.registerRetryCount, period); err != nil { glog.Errorf("Error registering node list %+v: %v", nodes, err) } // Start syncing node list from cloudprovider. - if syncNodeList && s.isRunningCloudProvider() { + if syncNodeList && nc.isRunningCloudProvider() { go util.Forever(func() { - if err = s.SyncCloud(); err != nil { + if err = nc.SyncCloudNodes(); err != nil { glog.Errorf("Error syncing cloud: %v", err) } }, period) } + // Start syncing or monitoring node status. if syncNodeStatus { - // Start syncing node status. go util.Forever(func() { - if err = s.SyncNodeStatus(); err != nil { + if err = nc.SyncProbedNodeStatus(); err != nil { glog.Errorf("Error syncing status: %v", err) } }, period) } else { - // Start checking node reachability and evicting timeouted pods. go util.Forever(func() { - if err = s.EvictTimeoutedPods(); err != nil { - glog.Errorf("Error evicting timeouted pods: %v", err) + if err = nc.MonitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) } - }, period) + }, nodeMonitorPeriod) } } // RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times. -func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { +func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { if len(nodes.Items) == 0 { return nil } registered := util.NewStringSet() - nodes = s.canonicalizeName(nodes) + nodes = nc.canonicalizeName(nodes) for i := 0; i < retryCount; i++ { for _, node := range nodes.Items { if registered.Has(node.Name) { continue } - _, err := s.kubeClient.Nodes().Create(&node) + _, err := nc.kubeClient.Nodes().Create(&node) if err == nil || apierrors.IsAlreadyExists(err) { registered.Insert(node.Name) glog.Infof("Registered node in registry: %s", node.Name) @@ -173,13 +199,13 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr } } -// SyncCloud synchronizes the list of instances from cloudprovider to master server. -func (s *NodeController) SyncCloud() error { - matches, err := s.GetCloudNodesWithSpec() +// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. +func (nc *NodeController) SyncCloudNodes() error { + matches, err := nc.GetCloudNodesWithSpec() if err != nil { return err } - nodes, err := s.kubeClient.Nodes().List() + nodes, err := nc.kubeClient.Nodes().List() if err != nil { return err } @@ -193,7 +219,7 @@ func (s *NodeController) SyncCloud() error { for _, node := range matches.Items { if _, ok := nodeMap[node.Name]; !ok { glog.Infof("Create node in registry: %s", node.Name) - _, err = s.kubeClient.Nodes().Create(&node) + _, err = nc.kubeClient.Nodes().Create(&node) if err != nil { glog.Errorf("Create node %s error: %v", node.Name, err) } @@ -204,24 +230,23 @@ func (s *NodeController) SyncCloud() error { // Delete nodes which have been deleted from cloud, but not from kubernetes cluster. for nodeID := range nodeMap { glog.Infof("Delete node from registry: %s", nodeID) - err = s.kubeClient.Nodes().Delete(nodeID) + err = nc.kubeClient.Nodes().Delete(nodeID) if err != nil { glog.Errorf("Delete node %s error: %v", nodeID, err) } - s.deletePods(nodeID) + nc.deletePods(nodeID) } return nil } -// SyncNodeStatus synchronizes cluster nodes status to master server. -func (s *NodeController) SyncNodeStatus() error { - nodes, err := s.kubeClient.Nodes().List() +// SyncProbedNodeStatus synchronizes cluster nodes status to master server. +func (nc *NodeController) SyncProbedNodeStatus() error { + nodes, err := nc.kubeClient.Nodes().List() if err != nil { return err } - nodes = s.UpdateNodesStatus(nodes) - nodes, err = s.PopulateAddresses(nodes) + nodes, err = nc.PopulateNodesStatus(nodes) if err != nil { return err } @@ -230,7 +255,7 @@ func (s *NodeController) SyncNodeStatus() error { // useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will // differ in every call of the sync loop. glog.V(2).Infof("updating node %v", node.Name) - _, err = s.kubeClient.Nodes().Update(&node) + _, err = nc.kubeClient.Nodes().Update(&node) if err != nil { glog.Errorf("error updating node %s: %v", node.Name, err) } @@ -238,37 +263,127 @@ func (s *NodeController) SyncNodeStatus() error { return nil } -// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe -// and deletes pods from not reachable nodes. -func (s *NodeController) EvictTimeoutedPods() error { - nodes, err := s.kubeClient.Nodes().List() +// PopulateNodesStatus populates node status for given list of nodes. +func (nc *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) { + var wg sync.WaitGroup + wg.Add(len(nodes.Items)) + for i := range nodes.Items { + go func(node *api.Node) { + node.Status.Conditions = nc.DoCheck(node) + if err := nc.populateNodeInfo(node); err != nil { + glog.Errorf("Can't collect information for node %s: %v", node.Name, err) + } + wg.Done() + }(&nodes.Items[i]) + } + wg.Wait() + return nc.PopulateAddresses(nodes) +} + +// populateNodeInfo gets node info from kubelet and update the node. +func (nc *NodeController) populateNodeInfo(node *api.Node) error { + nodeInfo, err := nc.kubeletClient.GetNodeInfo(node.Name) if err != nil { return err } - for _, node := range nodes.Items { - if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) { - s.deletePods(node.Name) - } + for key, value := range nodeInfo.Capacity { + node.Spec.Capacity[key] = value } + node.Status.NodeInfo = nodeInfo.NodeSystemInfo return nil } -func latestReadyTime(node *api.Node) util.Time { - readyTime := node.ObjectMeta.CreationTimestamp - for _, condition := range node.Status.Conditions { - if condition.Type == api.NodeReady && - condition.Status == api.ConditionFull && - condition.LastProbeTime.After(readyTime.Time) { - readyTime = condition.LastProbeTime +// DoCheck performs various condition checks for given node. +func (nc *NodeController) DoCheck(node *api.Node) []api.NodeCondition { + var conditions []api.NodeCondition + + // Check Condition: NodeReady. TODO: More node conditions. + oldReadyCondition := nc.getCondition(node, api.NodeReady) + newReadyCondition := nc.checkNodeReady(node) + nc.updateLastTransitionTime(oldReadyCondition, newReadyCondition) + if newReadyCondition.Status != api.ConditionFull { + // Node is not ready for this probe, we need to check if pods need to be deleted. + if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) { + // As long as the node fails, we call delete pods to delete all pods. Node controller sync + // is not a closed loop process, there is no feedback from other components regarding pod + // status. Keep listing pods to sanity check if pods are all deleted makes more sense. + nc.deletePods(node.Name) + } + } + conditions = append(conditions, *newReadyCondition) + + // Check Condition: NodeSchedulable + oldSchedulableCondition := nc.getCondition(node, api.NodeSchedulable) + newSchedulableCondition := nc.checkNodeSchedulable(node) + nc.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition) + conditions = append(conditions, *newSchedulableCondition) + + return conditions +} + +// updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition. +func (nc *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) { + if oldCondition != nil && oldCondition.Status == newCondition.Status { + // If node status doesn't change, transition time is same as last time. + newCondition.LastTransitionTime = oldCondition.LastTransitionTime + } else { + // Set transition time to Now() if node status changes or `oldCondition` is nil, which + // happens only when the node is checked for the first time. + newCondition.LastTransitionTime = nc.now() + } +} + +// checkNodeSchedulable checks node schedulable condition, without transition timestamp set. +func (nc *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition { + if node.Spec.Unschedulable { + return &api.NodeCondition{ + Type: api.NodeSchedulable, + Status: api.ConditionNone, + Reason: "User marked unschedulable during node create/update", + LastProbeTime: nc.now(), + } + } else { + return &api.NodeCondition{ + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: nc.now(), + } + } +} + +// checkNodeReady checks raw node ready condition, without transition timestamp set. +func (nc *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { + switch status, err := nc.kubeletClient.HealthCheck(node.Name); { + case err != nil: + glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err) + return &api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("Node health check error: %v", err), + LastProbeTime: nc.now(), + } + case status == probe.Failure: + return &api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"), + LastProbeTime: nc.now(), + } + default: + return &api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"), + LastProbeTime: nc.now(), } } - return readyTime } // PopulateAddresses queries Address for given list of nodes. -func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { - if s.isRunningCloudProvider() { - instances, ok := s.cloud.Instances() +func (nc *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { + if nc.isRunningCloudProvider() { + instances, ok := nc.cloud.Instances() if !ok { return nodes, ErrCloudInstance } @@ -289,7 +404,7 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} node.Status.Addresses = []api.NodeAddress{address} } else { - addrs, err := s.lookupIP(node.Name) + addrs, err := nc.lookupIP(node.Name) if err != nil { glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) } else if len(addrs) == 0 { @@ -304,153 +419,93 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, return nodes, nil } -// UpdateNodesStatus performs various condition checks for given list of nodes. -func (s *NodeController) UpdateNodesStatus(nodes *api.NodeList) *api.NodeList { - var wg sync.WaitGroup - wg.Add(len(nodes.Items)) +// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if not, +// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or +// not reachable for a long period of time. +func (nc *NodeController) MonitorNodeStatus() error { + nodes, err := nc.kubeClient.Nodes().List() + if err != nil { + return err + } for i := range nodes.Items { - go func(node *api.Node) { - node.Status.Conditions = s.DoCheck(node) - if err := s.updateNodeInfo(node); err != nil { - glog.Errorf("Can't collect information for node %s: %v", node.Name, err) + var gracePeriod time.Duration + var lastReadyCondition api.NodeCondition + node := &nodes.Items[i] + readyCondition := nc.getCondition(node, api.NodeReady) + if readyCondition == nil { + // If ready condition is nil, then kubelet (or nodecontroller) never posted node status. + // A fake ready condition is created, where LastProbeTime and LastTransitionTime is set + // to node.CreationTimestamp to avoid handle the corner case. + lastReadyCondition = api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionUnknown, + LastProbeTime: node.CreationTimestamp, + LastTransitionTime: node.CreationTimestamp, } - wg.Done() - }(&nodes.Items[i]) - } - wg.Wait() - return nodes -} + gracePeriod = nodeStartupGracePeriod + } else { + // If ready condition is not nil, make a copy of it, since we may modify it in place later. + lastReadyCondition = *readyCondition + gracePeriod = nodeMonitorGracePeriod + } -func (s *NodeController) updateNodeInfo(node *api.Node) error { - nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name) - if err != nil { - return err - } - for key, value := range nodeInfo.Capacity { - node.Spec.Capacity[key] = value - } - node.Status.NodeInfo = nodeInfo.NodeSystemInfo - return nil -} + // Check last time when NodeReady was updated. + if nc.now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) { + // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown + // (regardless of its current value) in the master, without contacting kubelet. + if readyCondition == nil { + glog.V(2).Infof("node %v is never updated by kubelet") + node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("Kubelet never posted node status"), + LastProbeTime: node.CreationTimestamp, + LastTransitionTime: nc.now(), + }) + } else { + glog.V(2).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v", + node.Name, nc.now().Time.Sub(lastReadyCondition.LastProbeTime.Time), lastReadyCondition) + if lastReadyCondition.Status != api.ConditionUnknown { + readyCondition.Status = api.ConditionUnknown + readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status") + // LastProbeTime is the last time we heard from kubelet. + readyCondition.LastProbeTime = lastReadyCondition.LastProbeTime + readyCondition.LastTransitionTime = nc.now() + } + } + _, err = nc.kubeClient.Nodes().Update(node) + if err != nil { + glog.Errorf("error updating node %s: %v", node.Name, err) + } + } -// DoCheck performs various condition checks for given node. -func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { - var conditions []api.NodeCondition - - // Check Condition: NodeReady. TODO: More node conditions. - oldReadyCondition := s.getCondition(node, api.NodeReady) - newReadyCondition := s.checkNodeReady(node) - s.updateLastTransitionTime(oldReadyCondition, newReadyCondition) - - if newReadyCondition.Status != api.ConditionFull { - // Node is not ready for this probe, we need to check if pods need to be deleted. - if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { - // As long as the node fails, we call delete pods to delete all pods. Node controller sync - // is not a closed loop process, there is no feedback from other components regarding pod - // status. Keep listing pods to sanity check if pods are all deleted makes more sense. - s.deletePods(node.Name) + if readyCondition != nil { + // Check eviction timeout. + if lastReadyCondition.Status == api.ConditionNone && + nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) { + // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. + nc.deletePods(node.Name) + } + if lastReadyCondition.Status == api.ConditionUnknown && + nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) { + // Same as above. Note however, since condition unknown is posted by node controller, which means we + // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. + nc.deletePods(node.Name) + } } } - conditions = append(conditions, *newReadyCondition) - - // Check Condition: NodeSchedulable - oldSchedulableCondition := s.getCondition(node, api.NodeSchedulable) - newSchedulableCondition := s.checkNodeSchedulable(node) - s.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition) - conditions = append(conditions, *newSchedulableCondition) - - return conditions -} - -// updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition. -func (s *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) { - if oldCondition != nil && oldCondition.Status == newCondition.Status { - // If node status doesn't change, transition time is same as last time. - newCondition.LastTransitionTime = oldCondition.LastTransitionTime - } else { - // Set transition time to Now() if node status changes or `oldCondition` is nil, which - // happens only when the node is checked for the first time. - newCondition.LastTransitionTime = util.Now() - } -} - -// checkNodeSchedulable checks node schedulable condition, without transition timestamp set. -func (s *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition { - if node.Spec.Unschedulable { - return &api.NodeCondition{ - Type: api.NodeSchedulable, - Status: api.ConditionNone, - Reason: "User marked unschedulable during node create/update", - LastProbeTime: util.Now(), - } - } else { - return &api.NodeCondition{ - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - LastProbeTime: util.Now(), - } - } -} - -// checkNodeReady checks raw node ready condition, without transition timestamp set. -func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { - switch status, err := s.kubeletClient.HealthCheck(node.Name); { - case err != nil: - glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err) - return &api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionUnknown, - Reason: fmt.Sprintf("Node health check error: %v", err), - LastProbeTime: util.Now(), - } - case status == probe.Failure: - return &api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionNone, - Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"), - LastProbeTime: util.Now(), - } - default: - return &api.NodeCondition{ - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"), - LastProbeTime: util.Now(), - } - } -} - -// deletePods will delete all pods from master running on given node. -func (s *NodeController) deletePods(nodeID string) error { - glog.V(2).Infof("Delete all pods from %v", nodeID) - // TODO: We don't yet have field selectors from client, see issue #1362. - pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) - if err != nil { - return err - } - for _, pod := range pods.Items { - if pod.Status.Host != nodeID { - continue - } - glog.V(2).Infof("Delete pod %v", pod.Name) - if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { - glog.Errorf("Error deleting pod %v: %v", pod.Name, err) - } - } - return nil } // GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The -// method only constructs spec fields for nodes. -func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. +func (nc *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} - for _, nodeID := range s.nodes { + for _, nodeID := range nc.nodes { node := api.Node{ ObjectMeta: api.ObjectMeta{Name: nodeID}, - Spec: api.NodeSpec{Capacity: s.staticResources.Capacity}, + Spec: api.NodeSpec{Capacity: nc.staticResources.Capacity}, } result.Items = append(result.Items, node) } @@ -458,15 +513,15 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { } // GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error -// occurs, an empty NodeList will be returned with a non-nil error info. The -// method only constructs spec fields for nodes. -func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { +// occurs, an empty NodeList will be returned with a non-nil error info. The method only +// constructs spec fields for nodes. +func (nc *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { result := &api.NodeList{} - instances, ok := s.cloud.Instances() + instances, ok := nc.cloud.Instances() if !ok { return result, ErrCloudInstance } - matches, err := instances.List(s.matchRE) + matches, err := instances.List(nc.matchRE) if err != nil { return result, err } @@ -478,7 +533,7 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { return nil, err } if resources == nil { - resources = s.staticResources + resources = nc.staticResources } if resources != nil { node.Spec.Capacity = resources.Capacity @@ -494,13 +549,34 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { return result, nil } +// deletePods will delete all pods from master running on given node. +func (nc *NodeController) deletePods(nodeID string) error { + glog.V(2).Infof("Delete all pods from %v", nodeID) + // TODO: We don't yet have field selectors from client, see issue #1362. + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) + if err != nil { + return err + } + for _, pod := range pods.Items { + if pod.Status.Host != nodeID { + continue + } + glog.V(2).Infof("Delete pod %v", pod.Name) + if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { + glog.Errorf("Error deleting pod %v: %v", pod.Name, err) + } + } + + return nil +} + // isRunningCloudProvider checks if cluster is running with cloud provider. -func (s *NodeController) isRunningCloudProvider() bool { - return s.cloud != nil && len(s.matchRE) > 0 +func (nc *NodeController) isRunningCloudProvider() bool { + return nc.cloud != nil && len(nc.matchRE) > 0 } // canonicalizeName takes a node list and lowercases all nodes' name. -func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { +func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { for i := range nodes.Items { nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name) } @@ -509,7 +585,7 @@ func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { // getCondition returns a condition object for the specific condition // type, nil if the condition is not set. -func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition { +func (nc *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition { for i := range node.Status.Conditions { if node.Status.Conditions[i].Type == conditionType { return &node.Status.Conditions[i] diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 20bd2f47383..ab731446580 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -39,7 +39,7 @@ import ( ) // FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It -// allows test cases to have fine-grained control over mock behaviors. We alos need +// allows test cases to have fine-grained control over mock behaviors. We also need // PodsInterface and PodInterface to test list & delet pods, which is implemented in // the embeded client.Fake field. type FakeNodeHandler struct { @@ -377,7 +377,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) { } } -func TestSyncCloud(t *testing.T) { +func TestSyncCloudNodes(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeCloud *fake_cloud.FakeCloud @@ -464,7 +464,7 @@ func TestSyncCloud(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) - if err := nodeController.SyncCloud(); err != nil { + if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -485,7 +485,7 @@ func TestSyncCloud(t *testing.T) { } } -func TestSyncCloudDeletePods(t *testing.T) { +func TestSyncCloudNodesEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeCloud *fake_cloud.FakeCloud @@ -546,7 +546,7 @@ func TestSyncCloudDeletePods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) - if err := nodeController.SyncCloud(); err != nil { + if err := nodeController.SyncCloudNodes(); err != nil { t.Errorf("unexpected error: %v", err) } if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { @@ -563,6 +563,7 @@ func TestSyncCloudDeletePods(t *testing.T) { } func TestNodeConditionsCheck(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { node *api.Node fakeKubeletClient *FakeKubeletClient @@ -578,14 +579,18 @@ func TestNodeConditionsCheck(t *testing.T) { }, expectedConditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -599,14 +604,18 @@ func TestNodeConditionsCheck(t *testing.T) { }, expectedConditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionNone, - Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", + Type: api.NodeReady, + Status: api.ConditionNone, + Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -620,14 +629,18 @@ func TestNodeConditionsCheck(t *testing.T) { }, expectedConditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionUnknown, - Reason: "Node health check error: Error", + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: "Node health check error: Error", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, { - Type: api.NodeSchedulable, - Status: api.ConditionNone, - Reason: "User marked unschedulable during node create/update", + Type: api.NodeSchedulable, + Status: api.ConditionNone, + Reason: "User marked unschedulable during node create/update", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, }, }, }, @@ -635,17 +648,8 @@ func TestNodeConditionsCheck(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) + nodeController.now = func() util.Time { return fakeNow } conditions := nodeController.DoCheck(item.node) - for i := range conditions { - if conditions[i].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - if conditions[i].LastProbeTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - conditions[i].LastTransitionTime = util.Time{} - conditions[i].LastProbeTime = util.Time{} - } if !reflect.DeepEqual(item.expectedConditions, conditions) { t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions) } @@ -688,16 +692,115 @@ func TestPopulateNodeAddresses(t *testing.T) { } } -func TestSyncNodeStatusTransitionTime(t *testing.T) { +func TestSyncProbedNodeStatus(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeKubeletClient *FakeKubeletClient - expectedRequestCount int - expectedTransitionTimeChange bool + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + fakeCloud *fake_cloud.FakeCloud + expectedNodes []*api.Node + expectedRequestCount int + }{ + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Success, + Err: nil, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, + }, + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, + }, + { + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, + }, + { + Type: api.NodeSchedulable, + Status: api.ConditionFull, + Reason: "Node is schedulable by default", + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + Addresses: []api.NodeAddress{ + {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + }, + }, + }, + }, + expectedRequestCount: 3, // List + 2xUpdate + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) + nodeController.now = func() util.Time { return fakeNow } + if err := nodeController.SyncProbedNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { + t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) + } + // Second sync will also update the node. + item.fakeNodeHandler.RequestCount = 0 + if err := nodeController.SyncProbedNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + } +} + +func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + expectedRequestCount int + expectedTransitionTime util.Time }{ { // Existing node is healthy, current probe is healthy too. // Existing node is schedulable, again explicitly mark node as schedulable. + // Expect transition time to stay the same as before. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { @@ -726,12 +829,13 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { Status: probe.Success, Err: nil, }, - expectedRequestCount: 2, // List+Update - expectedTransitionTimeChange: false, + expectedRequestCount: 2, // List+Update + expectedTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, { // Existing node is healthy, current probe is unhealthy. // Existing node is schedulable, mark node as unschedulable. + // Expect transition time to be now. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { @@ -760,153 +864,32 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) { Status: probe.Failure, Err: nil, }, - expectedRequestCount: 2, // List+Update - expectedTransitionTimeChange: true, + expectedRequestCount: 2, // List+Update + expectedTransitionTime: fakeNow, }, } for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) - nodeController.lookupIP = func(host string) ([]net.IP, error) { - return nil, fmt.Errorf("lookup %v: no such host", host) - } - if err := nodeController.SyncNodeStatus(); err != nil { + nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } + nodeController.now = func() util.Time { return fakeNow } + if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } - for i := range item.fakeNodeHandler.UpdatedNodes { - conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions - for j := range conditions { - condition := conditions[j] - if item.expectedTransitionTimeChange { - if !condition.LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime) - } - } else { - if !condition.LastTransitionTime.Equal(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime) - } + for _, node := range item.fakeNodeHandler.UpdatedNodes { + for _, condition := range node.Status.Conditions { + if !condition.LastTransitionTime.Time.Equal(item.expectedTransitionTime.Time) { + t.Errorf("expected last transition time %v, but got %v", item.expectedTransitionTime, condition.LastTransitionTime) } } } } } -func TestEvictTimeoutedPods(t *testing.T) { - table := []struct { - fakeNodeHandler *FakeNodeHandler - expectedRequestCount int - expectedActions []client.FakeAction - }{ - // Node created long time ago, with no status. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - // Node created recently, with no status. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Now(), - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: nil, - }, - // Node created long time ago, with status updated long time ago. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), - }, - }, - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, - }, - // Node created long time ago, with status updated recently. - { - fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{ - Name: "node0", - CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), - }, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - LastProbeTime: util.Now(), - }, - }, - }, - }, - }, - Fake: client.Fake{ - PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, - }, - }, - expectedRequestCount: 1, // List - expectedActions: nil, - }, - } - - for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) - if err := nodeController.EvictTimeoutedPods(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { - t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) - } - } -} - -func TestSyncNodeStatusDeletePods(t *testing.T) { +func TestSyncProbedNodeStatusEvictPods(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler fakeKubeletClient *FakeKubeletClient @@ -1041,10 +1024,8 @@ func TestSyncNodeStatusDeletePods(t *testing.T) { for _, item := range table { nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) - nodeController.lookupIP = func(host string) ([]net.IP, error) { - return nil, fmt.Errorf("lookup %v: no such host", host) - } - if err := nodeController.SyncNodeStatus(); err != nil { + nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) } + if err := nodeController.SyncProbedNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { @@ -1056,103 +1037,322 @@ func TestSyncNodeStatusDeletePods(t *testing.T) { } } -func TestSyncNodeStatus(t *testing.T) { +func TestMonitorNodeStatusEvictPods(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeKubeletClient *FakeKubeletClient - fakeCloud *fake_cloud.FakeCloud - expectedNodes []*api.Node - expectedRequestCount int + fakeNodeHandler *FakeNodeHandler + expectedEvictPods bool + evictionTimeout time.Duration }{ + // Node created recently, with no status (happens only at cluster startup). { fakeNodeHandler: &FakeNodeHandler{ - Existing: []*api.Node{newNode("node0"), newNode("node1")}, - }, - fakeKubeletClient: &FakeKubeletClient{ - Status: probe.Success, - Err: nil, - }, - fakeCloud: &fake_cloud.FakeCloud{ - Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}}, - }, - expectedNodes: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", - }, - { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - }, - }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: fakeNow, }, }, }, - { - ObjectMeta: api.ObjectMeta{Name: "node1"}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", - }, - { - Type: api.NodeSchedulable, - Status: api.ConditionFull, - Reason: "Node is schedulable by default", - }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + evictionTimeout: 30 * time.Minute, + expectedEvictPods: false, + }, + // Node created long time ago, and kubelet posted NotReady for a short period of time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, - Addresses: []api.NodeAddress{ - {Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionNone, + // Node status has just been updated, and transited to NotReady for 10min. + LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC), + }, + }, }, }, }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, }, - expectedRequestCount: 3, // List + 2xUpdate + evictionTimeout: 30 * time.Minute, + expectedEvictPods: false, + }, + // Node created long time ago, and kubelet posted NotReady for a long period of time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionNone, + // Node status has just been updated, and transited to NotReady for 1hr. + LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + evictionTimeout: 30 * time.Minute, + expectedEvictPods: true, + }, + // Node created long time ago, node controller posted Unknown for a short period of time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + // Node status was updated by nodecontroller 10min ago + LastProbeTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + evictionTimeout: 30 * time.Minute, + expectedEvictPods: false, + }, + // Node created long time ago, node controller posted Unknown for a long period of time. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + // Node status was updated by nodecontroller 1hr ago + LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + evictionTimeout: 30 * time.Minute, + expectedEvictPods: true, }, } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) - if err := nodeController.SyncNodeStatus(); err != nil { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout) + nodeController.now = func() util.Time { return fakeNow } + if err := nodeController.MonitorNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } - for i := range item.fakeNodeHandler.UpdatedNodes { - conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions - for j := range conditions { - if conditions[j].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero last transition timestamp") - } - if conditions[j].LastProbeTime.IsZero() { - t.Errorf("unexpected zero last probe timestamp") - } - conditions[j].LastTransitionTime = util.Time{} - conditions[j].LastProbeTime = util.Time{} + podEvicted := false + for _, action := range item.fakeNodeHandler.Actions { + if action.Action == "delete-pod" { + podEvicted = true } } + if item.expectedEvictPods != podEvicted { + t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted) + } + } +} + +func TestMonitorNodeStatusUpdateStatus(t *testing.T) { + fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC) + table := []struct { + fakeNodeHandler *FakeNodeHandler + expectedRequestCount int + expectedNodes []*api.Node + }{ + // Node created long time ago, without status: + // Expect Unknown status posted from node controller. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 2, // List+Update + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("Kubelet never posted node status"), + LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: fakeNow, + }, + }, + }, + }, + }, + }, + // Node created recently, without status. + // Expect no action from node controller (within startup grace period). + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: fakeNow, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedNodes: nil, + }, + // Node created long time ago, with status updated by kubelet exceeds grace period. + // Expect Unknown status posted from node controller. + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + // Node status hasn't been updated for 1hr. + LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 2, // List+Update + expectedNodes: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("Kubelet stopped posting node status"), + LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC), + LastTransitionTime: fakeNow, + }, + }, + }, + }, + }, + }, + // Node created long time ago, with status updated recently. + // Expect no action from node controller (within monitor grace period). + { + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{ + Name: "node0", + CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Type: api.NodeReady, + Status: api.ConditionFull, + // Node status has just been updated. + LastProbeTime: fakeNow, + LastTransitionTime: fakeNow, + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + expectedRequestCount: 1, // List + expectedNodes: nil, + }, + } + + for _, item := range table { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) + nodeController.now = func() util.Time { return fakeNow } + if err := nodeController.MonitorNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } - // Second sync will also update the node. - item.fakeNodeHandler.RequestCount = 0 - if err := nodeController.SyncNodeStatus(); err != nil { - t.Errorf("unexpected error: %v", err) - } - if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { - t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) - } } } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 328c85be6ba..6fb8ac8f3fb 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -75,7 +75,19 @@ const ( initialNodeStatusUpdateFrequency = 100 * time.Millisecond nodeStatusUpdateFrequencyInc = 500 * time.Millisecond - // The retry count for updating node status at each sync period. + // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master. + // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod + // in nodecontroller. There are several constraints: + // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where + // N means number of retries allowed for kubelet to post node status. It is pointless + // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there + // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. + // The constant must be less than podEvictionTimeout. + // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node + // status. Kubelet may fail to update node status reliablly if the value is too small, + // as it takes time to gather all necessary node information. + nodeStatusUpdateFrequency = 2 * time.Second + // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed. nodeStatusUpdateRetry = 5 ) @@ -124,7 +136,6 @@ func NewMainKubelet( streamingConnectionIdleTimeout time.Duration, recorder record.EventRecorder, cadvisorInterface cadvisor.Interface, - statusUpdateFrequency time.Duration, imageGCPolicy ImageGCPolicy) (*Kubelet, error) { if rootDirectory == "" { return nil, fmt.Errorf("invalid root directory %q", rootDirectory) @@ -202,7 +213,6 @@ func NewMainKubelet( dockerClient: dockerClient, kubeClient: kubeClient, rootDirectory: rootDirectory, - statusUpdateFrequency: statusUpdateFrequency, resyncInterval: resyncInterval, podInfraContainerImage: podInfraContainerImage, containerIDToRef: map[string]*api.ObjectReference{}, @@ -275,7 +285,6 @@ type Kubelet struct { rootDirectory string podInfraContainerImage string podWorkers *podWorkers - statusUpdateFrequency time.Duration resyncInterval time.Duration sourcesReady SourcesReadyFn @@ -532,7 +541,8 @@ func (kl *Kubelet) syncNodeStatus() { if kl.kubeClient == nil { return } - for feq := initialNodeStatusUpdateFrequency; feq < kl.statusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { + + for feq := initialNodeStatusUpdateFrequency; feq < nodeStatusUpdateFrequency; feq += nodeStatusUpdateFrequencyInc { select { case <-time.After(feq): if err := kl.updateNodeStatus(); err != nil { @@ -542,7 +552,7 @@ func (kl *Kubelet) syncNodeStatus() { } for { select { - case <-time.After(kl.statusUpdateFrequency): + case <-time.After(nodeStatusUpdateFrequency): if err := kl.updateNodeStatus(); err != nil { glog.Errorf("Unable to update node status: %v", err) } @@ -1837,20 +1847,23 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { node.Spec.Capacity = CapacityFromMachineInfo(info) } + currentTime := util.Now() newCondition := api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionFull, Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Now(), + LastProbeTime: currentTime, } updated := false for i := range node.Status.Conditions { if node.Status.Conditions[i].Type == api.NodeReady { + newCondition.LastTransitionTime = node.Status.Conditions[i].LastTransitionTime node.Status.Conditions[i] = newCondition updated = true } } if !updated { + newCondition.LastTransitionTime = currentTime node.Status.Conditions = append(node.Status.Conditions, newCondition) } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index f72140953cb..8c585854c71 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -3102,10 +3102,11 @@ func TestUpdateNewNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Time{}, + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Time{}, + LastTransitionTime: util.Time{}, }, }, NodeInfo: api.NodeSystemInfo{ @@ -3128,7 +3129,11 @@ func TestUpdateNewNodeStatus(t *testing.T) { if updatedNode.Status.Conditions[0].LastProbeTime.IsZero() { t.Errorf("unexpected zero last probe timestamp") } + if updatedNode.Status.Conditions[0].LastTransitionTime.IsZero() { + t.Errorf("unexpected zero last transition timestamp") + } updatedNode.Status.Conditions[0].LastProbeTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} if !reflect.DeepEqual(expectedNode, updatedNode) { t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) } @@ -3151,10 +3156,11 @@ func TestUpdateExistingNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), }, }, }, @@ -3173,10 +3179,11 @@ func TestUpdateExistingNodeStatus(t *testing.T) { Status: api.NodeStatus{ Conditions: []api.NodeCondition{ { - Type: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("kubelet is posting ready status"), - LastProbeTime: util.Time{}, // placeholder + Type: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("kubelet is posting ready status"), + LastProbeTime: util.Time{}, // placeholder + LastTransitionTime: util.Time{}, // placeholder }, }, NodeInfo: api.NodeSystemInfo{ @@ -3196,11 +3203,16 @@ func TestUpdateExistingNodeStatus(t *testing.T) { if !ok { t.Errorf("unexpected object type") } + // Expect LastProbeTime to be updated to Now, while LastTransitionTime to be the same. if reflect.DeepEqual(updatedNode.Status.Conditions[0].LastProbeTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastProbeTime, + t.Errorf("expected \n%v\n, got \n%v", util.Now(), util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) + } + if !reflect.DeepEqual(updatedNode.Status.Conditions[0].LastTransitionTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { + t.Errorf("expected \n%v\n, got \n%v", updatedNode.Status.Conditions[0].LastTransitionTime, util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) } updatedNode.Status.Conditions[0].LastProbeTime = util.Time{} + updatedNode.Status.Conditions[0].LastTransitionTime = util.Time{} if !reflect.DeepEqual(expectedNode, updatedNode) { t.Errorf("expected \n%v\n, got \n%v", expectedNode, updatedNode) }