Use fake time in nodecontroller unittest; rename receiver 's' to 'nc'

This commit is contained in:
Deyuan Deng 2015-03-22 21:10:35 -04:00 committed by Deyuan Deng
parent b51d491f05
commit c5675b8924
4 changed files with 238 additions and 233 deletions

View File

@ -107,7 +107,9 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The number of retries for initial node registration. Retry interval equals node_sync_period.") "The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.") fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controller send probes to kubelets and update NodeStatus.") fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, ""+
"If true, node controller sends probes to kubelet and updates NodeStatus."+
"If false, Kubelet posts NodeStatus to API server.")
// TODO: Discover these by pinging the host machines, and rip out these flags. // TODO: Discover these by pinging the host machines, and rip out these flags.
// TODO: in the meantime, use resource.QuantityFlag() instead of these // TODO: in the meantime, use resource.QuantityFlag() instead of these
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node") fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")

View File

@ -37,22 +37,20 @@ import (
const ( const (
// The constant is used if sync_nodes_status=False. NodeController will not proactively // The constant is used if sync_nodes_status=False. NodeController will not proactively
// sync node status in this case, but will monitor node status updated from kubelet. If // sync node status in this case, but will monitor node status updated from kubelet. If
// it doesn't receive update for this amount of time, it will start posting node NotReady // it doesn't receive update for this amount of time, it will start posting "NodeReady==
// condition. The amount of time when NodeController start evicting pods is controlled // ConditionUnknown". The amount of time before which NodeController start evicting pods
// via flag 'pod_eviction_timeout'. // is controlled via flag 'pod_eviction_timeout'.
// Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency // Note: be cautious when changing the constant, it must work with nodeStatusUpdateFrequency
// in kubelet. There are several constraints: // in kubelet. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless // N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// The constant must be less than podEvictionTimeout.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes
// longer for user to see up-to-date node status. // longer for user to see up-to-date node status.
// 3. nodeStatusUpdateFrequency needs to be large enough for Kubelet to generate node
// status. Kubelet may fail to update node status reliablly if the value is too small,
// as it takes time to gather all necessary node information.
nodeMonitorGracePeriod = 8 * time.Second nodeMonitorGracePeriod = 8 * time.Second
// The constant is used if sync_nodes_status=False, and for node startup. When node // The constant is used if sync_nodes_status=False, only for node startup. When node
// is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period.
nodeStartupGracePeriod = 30 * time.Second nodeStartupGracePeriod = 30 * time.Second
// The constant is used if sync_nodes_status=False. It controls NodeController monitoring // The constant is used if sync_nodes_status=False. It controls NodeController monitoring
@ -77,7 +75,9 @@ type NodeController struct {
kubeletClient client.KubeletClient kubeletClient client.KubeletClient
registerRetryCount int registerRetryCount int
podEvictionTimeout time.Duration podEvictionTimeout time.Duration
lookupIP func(host string) ([]net.IP, error) // Method for easy mocking in unittest.
lookupIP func(host string) ([]net.IP, error)
now func() util.Time
} }
// NewNodeController returns a new node controller to sync instances from cloudprovider. // NewNodeController returns a new node controller to sync instances from cloudprovider.
@ -100,6 +100,7 @@ func NewNodeController(
registerRetryCount: registerRetryCount, registerRetryCount: registerRetryCount,
podEvictionTimeout: podEvictionTimeout, podEvictionTimeout: podEvictionTimeout,
lookupIP: net.LookupIP, lookupIP: net.LookupIP,
now: util.Now,
} }
} }
@ -111,36 +112,38 @@ func NewNodeController(
// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider. // 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
// Node created here will only have specs. // Node created here will only have specs.
// 3. Depending on how k8s is configured, there are two ways of syncing the node status: // 3. Depending on how k8s is configured, there are two ways of syncing the node status:
// 3.1 SyncProbedNodeStatus() is called periodically to sync node status for nodes in k8s cluster. // 3.1 SyncProbedNodeStatus() is called periodically to trigger master to probe kubelet,
// 3.2 MonitorNodeStatus() is called periodically to monitor node status posted from kubelet. // and incorporate the resulting node status.
func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) { // 3.2 MonitorNodeStatus() is called periodically to incorporate the results of node status
// pushed from kubelet to master.
func (nc *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
// Register intial set of nodes with their status set. // Register intial set of nodes with their status set.
var nodes *api.NodeList var nodes *api.NodeList
var err error var err error
if s.isRunningCloudProvider() { if nc.isRunningCloudProvider() {
if syncNodeList { if syncNodeList {
if nodes, err = s.GetCloudNodesWithSpec(); err != nil { if nodes, err = nc.GetCloudNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial node from cloudprovider: %v", err) glog.Errorf("Error loading initial node from cloudprovider: %v", err)
} }
} else { } else {
nodes = &api.NodeList{} nodes = &api.NodeList{}
} }
} else { } else {
if nodes, err = s.GetStaticNodesWithSpec(); err != nil { if nodes, err = nc.GetStaticNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial static nodes: %v", err) glog.Errorf("Error loading initial static nodes: %v", err)
} }
} }
if nodes, err = s.PopulateAddresses(nodes); err != nil { if nodes, err = nc.PopulateAddresses(nodes); err != nil {
glog.Errorf("Error getting nodes ips: %v", err) glog.Errorf("Error getting nodes ips: %v", err)
} }
if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { if err = nc.RegisterNodes(nodes, nc.registerRetryCount, period); err != nil {
glog.Errorf("Error registering node list %+v: %v", nodes, err) glog.Errorf("Error registering node list %+v: %v", nodes, err)
} }
// Start syncing node list from cloudprovider. // Start syncing node list from cloudprovider.
if syncNodeList && s.isRunningCloudProvider() { if syncNodeList && nc.isRunningCloudProvider() {
go util.Forever(func() { go util.Forever(func() {
if err = s.SyncCloudNodes(); err != nil { if err = nc.SyncCloudNodes(); err != nil {
glog.Errorf("Error syncing cloud: %v", err) glog.Errorf("Error syncing cloud: %v", err)
} }
}, period) }, period)
@ -149,13 +152,13 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus
// Start syncing or monitoring node status. // Start syncing or monitoring node status.
if syncNodeStatus { if syncNodeStatus {
go util.Forever(func() { go util.Forever(func() {
if err = s.SyncProbedNodeStatus(); err != nil { if err = nc.SyncProbedNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err) glog.Errorf("Error syncing status: %v", err)
} }
}, period) }, period)
} else { } else {
go util.Forever(func() { go util.Forever(func() {
if err = s.MonitorNodeStatus(); err != nil { if err = nc.MonitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err) glog.Errorf("Error monitoring node status: %v", err)
} }
}, nodeMonitorPeriod) }, nodeMonitorPeriod)
@ -163,19 +166,19 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus
} }
// RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times. // RegisterNodes registers the given list of nodes, it keeps retrying for `retryCount` times.
func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error { func (nc *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retryInterval time.Duration) error {
if len(nodes.Items) == 0 { if len(nodes.Items) == 0 {
return nil return nil
} }
registered := util.NewStringSet() registered := util.NewStringSet()
nodes = s.canonicalizeName(nodes) nodes = nc.canonicalizeName(nodes)
for i := 0; i < retryCount; i++ { for i := 0; i < retryCount; i++ {
for _, node := range nodes.Items { for _, node := range nodes.Items {
if registered.Has(node.Name) { if registered.Has(node.Name) {
continue continue
} }
_, err := s.kubeClient.Nodes().Create(&node) _, err := nc.kubeClient.Nodes().Create(&node)
if err == nil || apierrors.IsAlreadyExists(err) { if err == nil || apierrors.IsAlreadyExists(err) {
registered.Insert(node.Name) registered.Insert(node.Name)
glog.Infof("Registered node in registry: %s", node.Name) glog.Infof("Registered node in registry: %s", node.Name)
@ -197,12 +200,12 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr
} }
// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server. // SyncCloudNodes synchronizes the list of instances from cloudprovider to master server.
func (s *NodeController) SyncCloudNodes() error { func (nc *NodeController) SyncCloudNodes() error {
matches, err := s.GetCloudNodesWithSpec() matches, err := nc.GetCloudNodesWithSpec()
if err != nil { if err != nil {
return err return err
} }
nodes, err := s.kubeClient.Nodes().List() nodes, err := nc.kubeClient.Nodes().List()
if err != nil { if err != nil {
return err return err
} }
@ -216,7 +219,7 @@ func (s *NodeController) SyncCloudNodes() error {
for _, node := range matches.Items { for _, node := range matches.Items {
if _, ok := nodeMap[node.Name]; !ok { if _, ok := nodeMap[node.Name]; !ok {
glog.Infof("Create node in registry: %s", node.Name) glog.Infof("Create node in registry: %s", node.Name)
_, err = s.kubeClient.Nodes().Create(&node) _, err = nc.kubeClient.Nodes().Create(&node)
if err != nil { if err != nil {
glog.Errorf("Create node %s error: %v", node.Name, err) glog.Errorf("Create node %s error: %v", node.Name, err)
} }
@ -227,23 +230,23 @@ func (s *NodeController) SyncCloudNodes() error {
// Delete nodes which have been deleted from cloud, but not from kubernetes cluster. // Delete nodes which have been deleted from cloud, but not from kubernetes cluster.
for nodeID := range nodeMap { for nodeID := range nodeMap {
glog.Infof("Delete node from registry: %s", nodeID) glog.Infof("Delete node from registry: %s", nodeID)
err = s.kubeClient.Nodes().Delete(nodeID) err = nc.kubeClient.Nodes().Delete(nodeID)
if err != nil { if err != nil {
glog.Errorf("Delete node %s error: %v", nodeID, err) glog.Errorf("Delete node %s error: %v", nodeID, err)
} }
s.deletePods(nodeID) nc.deletePods(nodeID)
} }
return nil return nil
} }
// SyncProbedNodeStatus synchronizes cluster nodes status to master server. // SyncProbedNodeStatus synchronizes cluster nodes status to master server.
func (s *NodeController) SyncProbedNodeStatus() error { func (nc *NodeController) SyncProbedNodeStatus() error {
nodes, err := s.kubeClient.Nodes().List() nodes, err := nc.kubeClient.Nodes().List()
if err != nil { if err != nil {
return err return err
} }
nodes, err = s.PopulateNodesStatus(nodes) nodes, err = nc.PopulateNodesStatus(nodes)
if err != nil { if err != nil {
return err return err
} }
@ -252,7 +255,7 @@ func (s *NodeController) SyncProbedNodeStatus() error {
// useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will // useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will
// differ in every call of the sync loop. // differ in every call of the sync loop.
glog.V(2).Infof("updating node %v", node.Name) glog.V(2).Infof("updating node %v", node.Name)
_, err = s.kubeClient.Nodes().Update(&node) _, err = nc.kubeClient.Nodes().Update(&node)
if err != nil { if err != nil {
glog.Errorf("error updating node %s: %v", node.Name, err) glog.Errorf("error updating node %s: %v", node.Name, err)
} }
@ -261,25 +264,25 @@ func (s *NodeController) SyncProbedNodeStatus() error {
} }
// PopulateNodesStatus populates node status for given list of nodes. // PopulateNodesStatus populates node status for given list of nodes.
func (s *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) { func (nc *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(nodes.Items)) wg.Add(len(nodes.Items))
for i := range nodes.Items { for i := range nodes.Items {
go func(node *api.Node) { go func(node *api.Node) {
node.Status.Conditions = s.DoCheck(node) node.Status.Conditions = nc.DoCheck(node)
if err := s.populateNodeInfo(node); err != nil { if err := nc.populateNodeInfo(node); err != nil {
glog.Errorf("Can't collect information for node %s: %v", node.Name, err) glog.Errorf("Can't collect information for node %s: %v", node.Name, err)
} }
wg.Done() wg.Done()
}(&nodes.Items[i]) }(&nodes.Items[i])
} }
wg.Wait() wg.Wait()
return s.PopulateAddresses(nodes) return nc.PopulateAddresses(nodes)
} }
// populateNodeInfo gets node info from kubelet and update the node. // populateNodeInfo gets node info from kubelet and update the node.
func (s *NodeController) populateNodeInfo(node *api.Node) error { func (nc *NodeController) populateNodeInfo(node *api.Node) error {
nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name) nodeInfo, err := nc.kubeletClient.GetNodeInfo(node.Name)
if err != nil { if err != nil {
return err return err
} }
@ -291,96 +294,96 @@ func (s *NodeController) populateNodeInfo(node *api.Node) error {
} }
// DoCheck performs various condition checks for given node. // DoCheck performs various condition checks for given node.
func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { func (nc *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
var conditions []api.NodeCondition var conditions []api.NodeCondition
// Check Condition: NodeReady. TODO: More node conditions. // Check Condition: NodeReady. TODO: More node conditions.
oldReadyCondition := s.getCondition(node, api.NodeReady) oldReadyCondition := nc.getCondition(node, api.NodeReady)
newReadyCondition := s.checkNodeReady(node) newReadyCondition := nc.checkNodeReady(node)
s.updateLastTransitionTime(oldReadyCondition, newReadyCondition) nc.updateLastTransitionTime(oldReadyCondition, newReadyCondition)
if newReadyCondition.Status != api.ConditionFull { if newReadyCondition.Status != api.ConditionFull {
// Node is not ready for this probe, we need to check if pods need to be deleted. // Node is not ready for this probe, we need to check if pods need to be deleted.
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
// As long as the node fails, we call delete pods to delete all pods. Node controller sync // As long as the node fails, we call delete pods to delete all pods. Node controller sync
// is not a closed loop process, there is no feedback from other components regarding pod // is not a closed loop process, there is no feedback from other components regarding pod
// status. Keep listing pods to sanity check if pods are all deleted makes more sense. // status. Keep listing pods to sanity check if pods are all deleted makes more sense.
s.deletePods(node.Name) nc.deletePods(node.Name)
} }
} }
conditions = append(conditions, *newReadyCondition) conditions = append(conditions, *newReadyCondition)
// Check Condition: NodeSchedulable // Check Condition: NodeSchedulable
oldSchedulableCondition := s.getCondition(node, api.NodeSchedulable) oldSchedulableCondition := nc.getCondition(node, api.NodeSchedulable)
newSchedulableCondition := s.checkNodeSchedulable(node) newSchedulableCondition := nc.checkNodeSchedulable(node)
s.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition) nc.updateLastTransitionTime(oldSchedulableCondition, newSchedulableCondition)
conditions = append(conditions, *newSchedulableCondition) conditions = append(conditions, *newSchedulableCondition)
return conditions return conditions
} }
// updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition. // updateLastTransitionTime updates LastTransitionTime for the newCondition based on oldCondition.
func (s *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) { func (nc *NodeController) updateLastTransitionTime(oldCondition, newCondition *api.NodeCondition) {
if oldCondition != nil && oldCondition.Status == newCondition.Status { if oldCondition != nil && oldCondition.Status == newCondition.Status {
// If node status doesn't change, transition time is same as last time. // If node status doesn't change, transition time is same as last time.
newCondition.LastTransitionTime = oldCondition.LastTransitionTime newCondition.LastTransitionTime = oldCondition.LastTransitionTime
} else { } else {
// Set transition time to Now() if node status changes or `oldCondition` is nil, which // Set transition time to Now() if node status changes or `oldCondition` is nil, which
// happens only when the node is checked for the first time. // happens only when the node is checked for the first time.
newCondition.LastTransitionTime = util.Now() newCondition.LastTransitionTime = nc.now()
} }
} }
// checkNodeSchedulable checks node schedulable condition, without transition timestamp set. // checkNodeSchedulable checks node schedulable condition, without transition timestamp set.
func (s *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition { func (nc *NodeController) checkNodeSchedulable(node *api.Node) *api.NodeCondition {
if node.Spec.Unschedulable { if node.Spec.Unschedulable {
return &api.NodeCondition{ return &api.NodeCondition{
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionNone, Status: api.ConditionNone,
Reason: "User marked unschedulable during node create/update", Reason: "User marked unschedulable during node create/update",
LastProbeTime: util.Now(), LastProbeTime: nc.now(),
} }
} else { } else {
return &api.NodeCondition{ return &api.NodeCondition{
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node is schedulable by default", Reason: "Node is schedulable by default",
LastProbeTime: util.Now(), LastProbeTime: nc.now(),
} }
} }
} }
// checkNodeReady checks raw node ready condition, without transition timestamp set. // checkNodeReady checks raw node ready condition, without transition timestamp set.
func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { func (nc *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
switch status, err := s.kubeletClient.HealthCheck(node.Name); { switch status, err := nc.kubeletClient.HealthCheck(node.Name); {
case err != nil: case err != nil:
glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err) glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err)
return &api.NodeCondition{ return &api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Node health check error: %v", err), Reason: fmt.Sprintf("Node health check error: %v", err),
LastProbeTime: util.Now(), LastProbeTime: nc.now(),
} }
case status == probe.Failure: case status == probe.Failure:
return &api.NodeCondition{ return &api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionNone, Status: api.ConditionNone,
Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"), Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"),
LastProbeTime: util.Now(), LastProbeTime: nc.now(),
} }
default: default:
return &api.NodeCondition{ return &api.NodeCondition{
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"), Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"),
LastProbeTime: util.Now(), LastProbeTime: nc.now(),
} }
} }
} }
// PopulateAddresses queries Address for given list of nodes. // PopulateAddresses queries Address for given list of nodes.
func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) { func (nc *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() { if nc.isRunningCloudProvider() {
instances, ok := s.cloud.Instances() instances, ok := nc.cloud.Instances()
if !ok { if !ok {
return nodes, ErrCloudInstance return nodes, ErrCloudInstance
} }
@ -401,7 +404,7 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList,
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()} address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
node.Status.Addresses = []api.NodeAddress{address} node.Status.Addresses = []api.NodeAddress{address}
} else { } else {
addrs, err := s.lookupIP(node.Name) addrs, err := nc.lookupIP(node.Name)
if err != nil { if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err) glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 { } else if len(addrs) == 0 {
@ -416,11 +419,11 @@ func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList,
return nodes, nil return nodes, nil
} }
// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if // MonitorNodeStatus verifies node status are constantly updated by kubelet, and if not,
// not, post node NotReady status. It also evicts all pods if node is not ready for // post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or
// a long period of time. // not reachable for a long period of time.
func (s *NodeController) MonitorNodeStatus() error { func (nc *NodeController) MonitorNodeStatus() error {
nodes, err := s.kubeClient.Nodes().List() nodes, err := nc.kubeClient.Nodes().List()
if err != nil { if err != nil {
return err return err
} }
@ -428,7 +431,7 @@ func (s *NodeController) MonitorNodeStatus() error {
var gracePeriod time.Duration var gracePeriod time.Duration
var lastReadyCondition api.NodeCondition var lastReadyCondition api.NodeCondition
node := &nodes.Items[i] node := &nodes.Items[i]
readyCondition := s.getCondition(node, api.NodeReady) readyCondition := nc.getCondition(node, api.NodeReady)
if readyCondition == nil { if readyCondition == nil {
// If ready condition is nil, then kubelet (or nodecontroller) never posted node status. // If ready condition is nil, then kubelet (or nodecontroller) never posted node status.
// A fake ready condition is created, where LastProbeTime and LastTransitionTime is set // A fake ready condition is created, where LastProbeTime and LastTransitionTime is set
@ -447,7 +450,7 @@ func (s *NodeController) MonitorNodeStatus() error {
} }
// Check last time when NodeReady was updated. // Check last time when NodeReady was updated.
if util.Now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) { if nc.now().After(lastReadyCondition.LastProbeTime.Add(gracePeriod)) {
// NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown
// (regardless of its current value) in the master, without contacting kubelet. // (regardless of its current value) in the master, without contacting kubelet.
if readyCondition == nil { if readyCondition == nil {
@ -456,21 +459,21 @@ func (s *NodeController) MonitorNodeStatus() error {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Kubelet never posted node status"), Reason: fmt.Sprintf("Kubelet never posted node status"),
LastProbeTime: util.Now(), LastProbeTime: node.CreationTimestamp,
LastTransitionTime: util.Now(), LastTransitionTime: nc.now(),
}) })
} else { } else {
// Note here the out-dated condition can be the one posted by nodecontroller glog.V(2).Infof("node %v hasn't been updated for %+v. Last ready condition is: %+v",
// itself before. We keep posting the status to keep LastProbeTime fresh. node.Name, nc.now().Time.Sub(lastReadyCondition.LastProbeTime.Time), lastReadyCondition)
glog.V(2).Infof("node %v hasn't been updated for a while, last ready condition is %+v", node.Name, readyCondition)
readyCondition.Status = api.ConditionUnknown
readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status")
readyCondition.LastProbeTime = util.Now()
if lastReadyCondition.Status != api.ConditionUnknown { if lastReadyCondition.Status != api.ConditionUnknown {
readyCondition.LastTransitionTime = util.Now() readyCondition.Status = api.ConditionUnknown
readyCondition.Reason = fmt.Sprintf("Kubelet stopped posting node status")
// LastProbeTime is the last time we heard from kubelet.
readyCondition.LastProbeTime = lastReadyCondition.LastProbeTime
readyCondition.LastTransitionTime = nc.now()
} }
} }
_, err = s.kubeClient.Nodes().Update(node) _, err = nc.kubeClient.Nodes().Update(node)
if err != nil { if err != nil {
glog.Errorf("error updating node %s: %v", node.Name, err) glog.Errorf("error updating node %s: %v", node.Name, err)
} }
@ -479,15 +482,15 @@ func (s *NodeController) MonitorNodeStatus() error {
if readyCondition != nil { if readyCondition != nil {
// Check eviction timeout. // Check eviction timeout.
if lastReadyCondition.Status == api.ConditionNone && if lastReadyCondition.Status == api.ConditionNone &&
util.Now().After(lastReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node. // Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
s.deletePods(node.Name) nc.deletePods(node.Name)
} }
if lastReadyCondition.Status == api.ConditionUnknown && if lastReadyCondition.Status == api.ConditionUnknown &&
util.Now().After(lastReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout-gracePeriod)) { nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) {
// Same as above. Note however, since condition unknown is posted by node controller, which means we // Same as above. Note however, since condition unknown is posted by node controller, which means we
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'. // need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
s.deletePods(node.Name) nc.deletePods(node.Name)
} }
} }
} }
@ -497,12 +500,12 @@ func (s *NodeController) MonitorNodeStatus() error {
// GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error // GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The method only // occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes. // constructs spec fields for nodes.
func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) { func (nc *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{} result := &api.NodeList{}
for _, nodeID := range s.nodes { for _, nodeID := range nc.nodes {
node := api.Node{ node := api.Node{
ObjectMeta: api.ObjectMeta{Name: nodeID}, ObjectMeta: api.ObjectMeta{Name: nodeID},
Spec: api.NodeSpec{Capacity: s.staticResources.Capacity}, Spec: api.NodeSpec{Capacity: nc.staticResources.Capacity},
} }
result.Items = append(result.Items, node) result.Items = append(result.Items, node)
} }
@ -512,13 +515,13 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
// GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error // GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The method only // occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes. // constructs spec fields for nodes.
func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) { func (nc *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{} result := &api.NodeList{}
instances, ok := s.cloud.Instances() instances, ok := nc.cloud.Instances()
if !ok { if !ok {
return result, ErrCloudInstance return result, ErrCloudInstance
} }
matches, err := instances.List(s.matchRE) matches, err := instances.List(nc.matchRE)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -530,7 +533,7 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
return nil, err return nil, err
} }
if resources == nil { if resources == nil {
resources = s.staticResources resources = nc.staticResources
} }
if resources != nil { if resources != nil {
node.Spec.Capacity = resources.Capacity node.Spec.Capacity = resources.Capacity
@ -547,10 +550,10 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
} }
// deletePods will delete all pods from master running on given node. // deletePods will delete all pods from master running on given node.
func (s *NodeController) deletePods(nodeID string) error { func (nc *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID) glog.V(2).Infof("Delete all pods from %v", nodeID)
// TODO: We don't yet have field selectors from client, see issue #1362. // TODO: We don't yet have field selectors from client, see issue #1362.
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
if err != nil { if err != nil {
return err return err
} }
@ -559,7 +562,7 @@ func (s *NodeController) deletePods(nodeID string) error {
continue continue
} }
glog.V(2).Infof("Delete pod %v", pod.Name) glog.V(2).Infof("Delete pod %v", pod.Name)
if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil { if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil {
glog.Errorf("Error deleting pod %v: %v", pod.Name, err) glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
} }
} }
@ -568,12 +571,12 @@ func (s *NodeController) deletePods(nodeID string) error {
} }
// isRunningCloudProvider checks if cluster is running with cloud provider. // isRunningCloudProvider checks if cluster is running with cloud provider.
func (s *NodeController) isRunningCloudProvider() bool { func (nc *NodeController) isRunningCloudProvider() bool {
return s.cloud != nil && len(s.matchRE) > 0 return nc.cloud != nil && len(nc.matchRE) > 0
} }
// canonicalizeName takes a node list and lowercases all nodes' name. // canonicalizeName takes a node list and lowercases all nodes' name.
func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList { func (nc *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
for i := range nodes.Items { for i := range nodes.Items {
nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name) nodes.Items[i].Name = strings.ToLower(nodes.Items[i].Name)
} }
@ -582,7 +585,7 @@ func (s *NodeController) canonicalizeName(nodes *api.NodeList) *api.NodeList {
// getCondition returns a condition object for the specific condition // getCondition returns a condition object for the specific condition
// type, nil if the condition is not set. // type, nil if the condition is not set.
func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition { func (nc *NodeController) getCondition(node *api.Node, conditionType api.NodeConditionType) *api.NodeCondition {
for i := range node.Status.Conditions { for i := range node.Status.Conditions {
if node.Status.Conditions[i].Type == conditionType { if node.Status.Conditions[i].Type == conditionType {
return &node.Status.Conditions[i] return &node.Status.Conditions[i]

View File

@ -563,6 +563,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
} }
func TestNodeConditionsCheck(t *testing.T) { func TestNodeConditionsCheck(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct { table := []struct {
node *api.Node node *api.Node
fakeKubeletClient *FakeKubeletClient fakeKubeletClient *FakeKubeletClient
@ -578,14 +579,18 @@ func TestNodeConditionsCheck(t *testing.T) {
}, },
expectedConditions: []api.NodeCondition{ expectedConditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
{ {
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node is schedulable by default", Reason: "Node is schedulable by default",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
}, },
}, },
@ -599,14 +604,18 @@ func TestNodeConditionsCheck(t *testing.T) {
}, },
expectedConditions: []api.NodeCondition{ expectedConditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionNone, Status: api.ConditionNone,
Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", Reason: "Node health check failed: kubelet /healthz endpoint returns not ok",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
{ {
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node is schedulable by default", Reason: "Node is schedulable by default",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
}, },
}, },
@ -620,14 +629,18 @@ func TestNodeConditionsCheck(t *testing.T) {
}, },
expectedConditions: []api.NodeCondition{ expectedConditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
Reason: "Node health check error: Error", Reason: "Node health check error: Error",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
{ {
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionNone, Status: api.ConditionNone,
Reason: "User marked unschedulable during node create/update", Reason: "User marked unschedulable during node create/update",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
}, },
}, },
@ -635,17 +648,8 @@ func TestNodeConditionsCheck(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute)
nodeController.now = func() util.Time { return fakeNow }
conditions := nodeController.DoCheck(item.node) conditions := nodeController.DoCheck(item.node)
for i := range conditions {
if conditions[i].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
if conditions[i].LastProbeTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
conditions[i].LastTransitionTime = util.Time{}
conditions[i].LastProbeTime = util.Time{}
}
if !reflect.DeepEqual(item.expectedConditions, conditions) { if !reflect.DeepEqual(item.expectedConditions, conditions) {
t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions) t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions)
} }
@ -689,6 +693,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
} }
func TestSyncProbedNodeStatus(t *testing.T) { func TestSyncProbedNodeStatus(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient fakeKubeletClient *FakeKubeletClient
@ -713,14 +718,18 @@ func TestSyncProbedNodeStatus(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
{ {
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node is schedulable by default", Reason: "Node is schedulable by default",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
}, },
Addresses: []api.NodeAddress{ Addresses: []api.NodeAddress{
@ -733,14 +742,18 @@ func TestSyncProbedNodeStatus(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
{ {
Type: api.NodeSchedulable, Type: api.NodeSchedulable,
Status: api.ConditionFull, Status: api.ConditionFull,
Reason: "Node is schedulable by default", Reason: "Node is schedulable by default",
LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
}, },
Addresses: []api.NodeAddress{ Addresses: []api.NodeAddress{
@ -755,25 +768,13 @@ func TestSyncProbedNodeStatus(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.SyncProbedNodeStatus(); err != nil { if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
} }
for i := range item.fakeNodeHandler.UpdatedNodes {
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
for j := range conditions {
if conditions[j].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
if conditions[j].LastProbeTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
conditions[j].LastTransitionTime = util.Time{}
conditions[j].LastProbeTime = util.Time{}
}
}
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
} }
@ -789,15 +790,17 @@ func TestSyncProbedNodeStatus(t *testing.T) {
} }
func TestSyncProbedNodeStatusTransitionTime(t *testing.T) { func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient fakeKubeletClient *FakeKubeletClient
expectedRequestCount int expectedRequestCount int
expectedTransitionTimeChange bool expectedTransitionTime util.Time
}{ }{
{ {
// Existing node is healthy, current probe is healthy too. // Existing node is healthy, current probe is healthy too.
// Existing node is schedulable, again explicitly mark node as schedulable. // Existing node is schedulable, again explicitly mark node as schedulable.
// Expect transition time to stay the same as before.
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
{ {
@ -826,12 +829,13 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
Status: probe.Success, Status: probe.Success,
Err: nil, Err: nil,
}, },
expectedRequestCount: 2, // List+Update expectedRequestCount: 2, // List+Update
expectedTransitionTimeChange: false, expectedTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
}, },
{ {
// Existing node is healthy, current probe is unhealthy. // Existing node is healthy, current probe is unhealthy.
// Existing node is schedulable, mark node as unschedulable. // Existing node is schedulable, mark node as unschedulable.
// Expect transition time to be now.
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
{ {
@ -860,34 +864,25 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
Status: probe.Failure, Status: probe.Failure,
Err: nil, Err: nil,
}, },
expectedRequestCount: 2, // List+Update expectedRequestCount: 2, // List+Update
expectedTransitionTimeChange: true, expectedTransitionTime: fakeNow,
}, },
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
nodeController.lookupIP = func(host string) ([]net.IP, error) { nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
return nil, fmt.Errorf("lookup %v: no such host", host) nodeController.now = func() util.Time { return fakeNow }
}
if err := nodeController.SyncProbedNodeStatus(); err != nil { if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
} }
for i := range item.fakeNodeHandler.UpdatedNodes { for _, node := range item.fakeNodeHandler.UpdatedNodes {
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions for _, condition := range node.Status.Conditions {
for j := range conditions { if !condition.LastTransitionTime.Time.Equal(item.expectedTransitionTime.Time) {
condition := conditions[j] t.Errorf("expected last transition time %v, but got %v", item.expectedTransitionTime, condition.LastTransitionTime)
if item.expectedTransitionTimeChange {
if !condition.LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime)
}
} else {
if !condition.LastTransitionTime.Equal(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) {
t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime)
}
} }
} }
} }
@ -1029,9 +1024,7 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute)
nodeController.lookupIP = func(host string) ([]net.IP, error) { nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
return nil, fmt.Errorf("lookup %v: no such host", host)
}
if err := nodeController.SyncProbedNodeStatus(); err != nil { if err := nodeController.SyncProbedNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1045,9 +1038,11 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
} }
func TestMonitorNodeStatusEvictPods(t *testing.T) { func TestMonitorNodeStatusEvictPods(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
expectedEvictPods bool expectedEvictPods bool
evictionTimeout time.Duration
}{ }{
// Node created recently, with no status (happens only at cluster startup). // Node created recently, with no status (happens only at cluster startup).
{ {
@ -1056,7 +1051,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: util.Now(), CreationTimestamp: fakeNow,
}, },
}, },
}, },
@ -1064,9 +1059,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
evictionTimeout: 30 * time.Minute,
expectedEvictPods: false, expectedEvictPods: false,
}, },
// Node created long time ago, with not ready status updated by kubelet for a short time. // Node created long time ago, and kubelet posted NotReady for a short period of time.
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1078,10 +1074,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionNone, Status: api.ConditionNone,
LastProbeTime: util.Now(), // Node status has just been updated, and transited to NotReady for 10min.
LastTransitionTime: util.Now(), LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC),
LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC),
}, },
}, },
}, },
@ -1091,9 +1088,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
evictionTimeout: 30 * time.Minute,
expectedEvictPods: false, expectedEvictPods: false,
}, },
// Node created long time ago, with not ready status updated by kubelet for a long time. // Node created long time ago, and kubelet posted NotReady for a long period of time.
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1105,10 +1103,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionNone,
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), // Node status has just been updated, and transited to NotReady for 1hr.
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastProbeTime: util.Date(2015, 1, 1, 11, 59, 0, 0, time.UTC),
LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
}, },
}, },
}, },
@ -1118,9 +1117,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
evictionTimeout: 30 * time.Minute,
expectedEvictPods: true, expectedEvictPods: true,
}, },
// Node created long time ago, with unknown status updated by node controller for a short time. // Node created long time ago, node controller posted Unknown for a short period of time.
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1132,10 +1132,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
LastProbeTime: util.Now(), // Node status was updated by nodecontroller 10min ago
LastTransitionTime: util.Now(), LastProbeTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC),
LastTransitionTime: util.Date(2015, 1, 1, 11, 50, 0, 0, time.UTC),
}, },
}, },
}, },
@ -1145,9 +1146,10 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
evictionTimeout: 30 * time.Minute,
expectedEvictPods: false, expectedEvictPods: false,
}, },
// Node created long time ago, with unknown status updated by node controller for a long time. // Node created long time ago, node controller posted Unknown for a long period of time.
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1159,10 +1161,11 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), // Node status was updated by nodecontroller 1hr ago
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
}, },
}, },
}, },
@ -1172,12 +1175,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
}, },
}, },
evictionTimeout: 30 * time.Minute,
expectedEvictPods: true, expectedEvictPods: true,
}, },
} }
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.MonitorNodeStatus(); err != nil { if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
@ -1194,12 +1199,14 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
} }
func TestMonitorNodeStatusUpdateStatus(t *testing.T) { func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
fakeNow := util.Date(2015, 1, 1, 12, 0, 0, 0, time.UTC)
table := []struct { table := []struct {
fakeNodeHandler *FakeNodeHandler fakeNodeHandler *FakeNodeHandler
expectedRequestCount int expectedRequestCount int
expectedNodes []*api.Node expectedNodes []*api.Node
}{ }{
// Node created long time ago, with no status. // Node created long time ago, without status:
// Expect Unknown status posted from node controller.
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1227,22 +1234,23 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Kubelet never posted node status"), Reason: fmt.Sprintf("Kubelet never posted node status"),
LastProbeTime: util.Time{}, LastProbeTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastTransitionTime: util.Time{}, LastTransitionTime: fakeNow,
}, },
}, },
}, },
}, },
}, },
}, },
// Node created recently, with no status. // Node created recently, without status.
// Expect no action from node controller (within startup grace period).
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
{ {
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "node0", Name: "node0",
CreationTimestamp: util.Now(), CreationTimestamp: fakeNow,
}, },
}, },
}, },
@ -1253,7 +1261,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
expectedRequestCount: 1, // List expectedRequestCount: 1, // List
expectedNodes: nil, expectedNodes: nil,
}, },
// Node created long time ago, with status updated long time ago. // Node created long time ago, with status updated by kubelet exceeds grace period.
// Expect Unknown status posted from node controller.
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1265,10 +1274,11 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFull, Status: api.ConditionFull,
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), // Node status hasn't been updated for 1hr.
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC), LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
LastTransitionTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
}, },
}, },
}, },
@ -1291,8 +1301,8 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionUnknown, Status: api.ConditionUnknown,
Reason: fmt.Sprintf("Kubelet stopped posting node status"), Reason: fmt.Sprintf("Kubelet stopped posting node status"),
LastProbeTime: util.Time{}, LastProbeTime: util.Date(2015, 1, 1, 11, 0, 0, 0, time.UTC),
LastTransitionTime: util.Time{}, LastTransitionTime: fakeNow,
}, },
}, },
}, },
@ -1300,6 +1310,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
}, },
}, },
// Node created long time ago, with status updated recently. // Node created long time ago, with status updated recently.
// Expect no action from node controller (within monitor grace period).
{ {
fakeNodeHandler: &FakeNodeHandler{ fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{ Existing: []*api.Node{
@ -1311,10 +1322,11 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
Status: api.NodeStatus{ Status: api.NodeStatus{
Conditions: []api.NodeCondition{ Conditions: []api.NodeCondition{
{ {
Type: api.NodeReady, Type: api.NodeReady,
Status: api.ConditionFull, Status: api.ConditionFull,
LastProbeTime: util.Now(), // Node status has just been updated.
LastTransitionTime: util.Time{}, LastProbeTime: fakeNow,
LastTransitionTime: fakeNow,
}, },
}, },
}, },
@ -1331,25 +1343,13 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
for _, item := range table { for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute) nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
nodeController.now = func() util.Time { return fakeNow }
if err := nodeController.MonitorNodeStatus(); err != nil { if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
} }
for i := range item.fakeNodeHandler.UpdatedNodes {
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
for j := range conditions {
if conditions[j].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
if conditions[j].LastProbeTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
conditions[j].LastTransitionTime = util.Time{}
conditions[j].LastProbeTime = util.Time{}
}
}
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
} }

View File

@ -75,20 +75,20 @@ const (
initialNodeStatusUpdateFrequency = 100 * time.Millisecond initialNodeStatusUpdateFrequency = 100 * time.Millisecond
nodeStatusUpdateFrequencyInc = 500 * time.Millisecond nodeStatusUpdateFrequencyInc = 500 * time.Millisecond
// Node status update frequency and retry count. // nodeStatusUpdateFrequency specifies how often kubelet posts node status to master.
// Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod // Note: be cautious when changing the constant, it must work with nodeMonitorGracePeriod
// in nodecontroller. There are several constraints: // in nodecontroller. There are several constraints:
// 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where // 1. nodeMonitorGracePeriod must be N times more than nodeStatusUpdateFrequency, where
// N means number of retries allowed for kubelet to post node status. It is pointless // N means number of retries allowed for kubelet to post node status. It is pointless
// to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there // to make nodeMonitorGracePeriod be less than nodeStatusUpdateFrequency, since there
// will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency. // will only be fresh values from Kubelet at an interval of nodeStatusUpdateFrequency.
// 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // The constant must be less than podEvictionTimeout.
// longer for user to see up-to-date node status. // 2. nodeStatusUpdateFrequency needs to be large enough for kubelet to generate node
// 3. nodeStatusUpdateFrequency needs to be large enough for Kubelet to generate node
// status. Kubelet may fail to update node status reliablly if the value is too small, // status. Kubelet may fail to update node status reliablly if the value is too small,
// as it takes time to gather all necessary node information. // as it takes time to gather all necessary node information.
nodeStatusUpdateFrequency = 2 * time.Second nodeStatusUpdateFrequency = 2 * time.Second
nodeStatusUpdateRetry = 5 // nodeStatusUpdateRetry specifies how many times kubelet retries when posting node status failed.
nodeStatusUpdateRetry = 5
) )
var ( var (