Node controller monitor node status

This commit is contained in:
Deyuan Deng 2015-03-11 23:00:52 -04:00
parent a4c02d8ede
commit 0d5f8dfde1
3 changed files with 401 additions and 281 deletions

View File

@ -107,7 +107,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
fs.BoolVar(&s.SyncNodeList, "sync_nodes", s.SyncNodeList, "If true, and --cloud_provider is specified, sync nodes from the cloud provider. Default true.")
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controler send probes to kubelets and update NodeStatus.")
fs.BoolVar(&s.SyncNodeStatus, "sync_node_status", s.SyncNodeStatus, "Should node controller send probes to kubelets and update NodeStatus.")
// TODO: Discover these by pinging the host machines, and rip out these flags.
// TODO: in the meantime, use resource.QuantityFlag() instead of these
fs.Int64Var(&s.NodeMilliCPU, "node_milli_cpu", s.NodeMilliCPU, "The amount of MilliCPU provisioned on each node")

View File

@ -34,6 +34,20 @@ import (
"github.com/golang/glog"
)
const (
// The constant is used if sync_nodes_status=False. NodeController will not proactively
// sync node status in this case, but will monitor node status updated from kubelet. If
// it doesn't receive update for this amount of time, it will start posting node NotReady
// condition. The amount of time when NodeController start evicting pods is controlled
// via flag 'pod_eviction_timeout'.
nodeMonitorGracePeriod = 8 * time.Second
// The constant is used if sync_nodes_status=False. It controls NodeController monitoring
// period, i.e. how often does NodeController check node status posted from kubelet.
// Theoretically, this value should be lower than nodeMonitorGracePeriod.
// TODO: Change node status monitor to watch based.
nodeMonitorPeriod = 5 * time.Second
)
var (
ErrRegistration = errors.New("unable to register all nodes.")
ErrQueryIPAddress = errors.New("unable to query IP address.")
@ -53,8 +67,6 @@ type NodeController struct {
}
// NewNodeController returns a new node controller to sync instances from cloudprovider.
// TODO: NodeController health checker should be a separate package other than
// kubeletclient, node health check != kubelet health check.
func NewNodeController(
cloud cloudprovider.Interface,
matchRE string,
@ -77,36 +89,34 @@ func NewNodeController(
}
}
// Run creates initial node list and start syncing instances from cloudprovider if any.
// Run creates initial node list and start syncing instances from cloudprovider, if any.
// It also starts syncing cluster node status.
// 1. RegisterNodes() is called only once to register all initial nodes (from cloudprovider
// or from command line flag). To make cluster bootstrap faster, node controller populates
// node addresses.
// 2. SyncCloud() is called periodically (if enabled) to sync instances from cloudprovider.
// 2. SyncCloudNodes() is called periodically (if enabled) to sync instances from cloudprovider.
// Node created here will only have specs.
// 3. SyncNodeStatus() is called periodically (if enabled) to sync node status for nodes in
// k8s cluster.
// k8s cluster. If not enabled, MonitorNodeStatus() is called otherwise to monitor node
// status posted from kubelet.
func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus bool) {
// Register intial set of nodes with their status set.
var nodes *api.NodeList
var err error
if s.isRunningCloudProvider() {
if syncNodeList {
nodes, err = s.GetCloudNodesWithSpec()
if err != nil {
if nodes, err = s.GetCloudNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial node from cloudprovider: %v", err)
}
} else {
nodes = &api.NodeList{}
}
} else {
nodes, err = s.GetStaticNodesWithSpec()
if err != nil {
if nodes, err = s.GetStaticNodesWithSpec(); err != nil {
glog.Errorf("Error loading initial static nodes: %v", err)
}
}
nodes, err = s.PopulateAddresses(nodes)
if err != nil {
if nodes, err = s.PopulateAddresses(nodes); err != nil {
glog.Errorf("Error getting nodes ips: %v", err)
}
if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil {
@ -116,26 +126,25 @@ func (s *NodeController) Run(period time.Duration, syncNodeList, syncNodeStatus
// Start syncing node list from cloudprovider.
if syncNodeList && s.isRunningCloudProvider() {
go util.Forever(func() {
if err = s.SyncCloud(); err != nil {
if err = s.SyncCloudNodes(); err != nil {
glog.Errorf("Error syncing cloud: %v", err)
}
}, period)
}
// Start syncing or monitoring node status.
if syncNodeStatus {
// Start syncing node status.
go util.Forever(func() {
if err = s.SyncNodeStatus(); err != nil {
glog.Errorf("Error syncing status: %v", err)
}
}, period)
} else {
// Start checking node reachability and evicting timeouted pods.
go util.Forever(func() {
if err = s.EvictTimeoutedPods(); err != nil {
glog.Errorf("Error evicting timeouted pods: %v", err)
if err = s.MonitorNodeStatus(); err != nil {
glog.Errorf("Error monitoring node status: %v", err)
}
}, period)
}, nodeMonitorPeriod)
}
}
@ -173,8 +182,8 @@ func (s *NodeController) RegisterNodes(nodes *api.NodeList, retryCount int, retr
}
}
// SyncCloud synchronizes the list of instances from cloudprovider to master server.
func (s *NodeController) SyncCloud() error {
// SyncCloudNodes synchronizes the list of instances from cloudprovider to master server.
func (s *NodeController) SyncCloudNodes() error {
matches, err := s.GetCloudNodesWithSpec()
if err != nil {
return err
@ -220,8 +229,7 @@ func (s *NodeController) SyncNodeStatus() error {
if err != nil {
return err
}
nodes = s.UpdateNodesStatus(nodes)
nodes, err = s.PopulateAddresses(nodes)
nodes, err = s.PopulateNodesStatus(nodes)
if err != nil {
return err
}
@ -238,90 +246,25 @@ func (s *NodeController) SyncNodeStatus() error {
return nil
}
// EvictTimeoutedPods verifies if nodes are reachable by checking the time of last probe
// and deletes pods from not reachable nodes.
func (s *NodeController) EvictTimeoutedPods() error {
nodes, err := s.kubeClient.Nodes().List()
if err != nil {
return err
}
for _, node := range nodes.Items {
if util.Now().After(latestReadyTime(&node).Add(s.podEvictionTimeout)) {
s.deletePods(node.Name)
}
}
return nil
}
func latestReadyTime(node *api.Node) util.Time {
readyTime := node.ObjectMeta.CreationTimestamp
for _, condition := range node.Status.Conditions {
if condition.Type == api.NodeReady &&
condition.Status == api.ConditionFull &&
condition.LastProbeTime.After(readyTime.Time) {
readyTime = condition.LastProbeTime
}
}
return readyTime
}
// PopulateAddresses queries Address for given list of nodes.
func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() {
instances, ok := s.cloud.Instances()
if !ok {
return nodes, ErrCloudInstance
}
for i := range nodes.Items {
node := &nodes.Items[i]
nodeAddresses, err := instances.NodeAddresses(node.Name)
if err != nil {
glog.Errorf("error getting instance addresses for %s: %v", node.Name, err)
} else {
node.Status.Addresses = nodeAddresses
}
}
} else {
for i := range nodes.Items {
node := &nodes.Items[i]
addr := net.ParseIP(node.Name)
if addr != nil {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
node.Status.Addresses = []api.NodeAddress{address}
} else {
addrs, err := s.lookupIP(node.Name)
if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", node.Name)
} else {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}
node.Status.Addresses = []api.NodeAddress{address}
}
}
}
}
return nodes, nil
}
// UpdateNodesStatus performs various condition checks for given list of nodes.
func (s *NodeController) UpdateNodesStatus(nodes *api.NodeList) *api.NodeList {
// PopulateNodesStatus populates node status for given list of nodes.
func (s *NodeController) PopulateNodesStatus(nodes *api.NodeList) (*api.NodeList, error) {
var wg sync.WaitGroup
wg.Add(len(nodes.Items))
for i := range nodes.Items {
go func(node *api.Node) {
node.Status.Conditions = s.DoCheck(node)
if err := s.updateNodeInfo(node); err != nil {
if err := s.populateNodeInfo(node); err != nil {
glog.Errorf("Can't collect information for node %s: %v", node.Name, err)
}
wg.Done()
}(&nodes.Items[i])
}
wg.Wait()
return nodes
return s.PopulateAddresses(nodes)
}
func (s *NodeController) updateNodeInfo(node *api.Node) error {
// populateNodeInfo gets node info from kubelet and update the node.
func (s *NodeController) populateNodeInfo(node *api.Node) error {
nodeInfo, err := s.kubeletClient.GetNodeInfo(node.Name)
if err != nil {
return err
@ -341,7 +284,6 @@ func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition {
oldReadyCondition := s.getCondition(node, api.NodeReady)
newReadyCondition := s.checkNodeReady(node)
s.updateLastTransitionTime(oldReadyCondition, newReadyCondition)
if newReadyCondition.Status != api.ConditionFull {
// Node is not ready for this probe, we need to check if pods need to be deleted.
if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) {
@ -421,30 +363,95 @@ func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition {
}
}
// deletePods will delete all pods from master running on given node.
func (s *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
// TODO: We don't yet have field selectors from client, see issue #1362.
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
// PopulateAddresses queries Address for given list of nodes.
func (s *NodeController) PopulateAddresses(nodes *api.NodeList) (*api.NodeList, error) {
if s.isRunningCloudProvider() {
instances, ok := s.cloud.Instances()
if !ok {
return nodes, ErrCloudInstance
}
for i := range nodes.Items {
node := &nodes.Items[i]
nodeAddresses, err := instances.NodeAddresses(node.Name)
if err != nil {
glog.Errorf("error getting instance addresses for %s: %v", node.Name, err)
} else {
node.Status.Addresses = nodeAddresses
}
}
} else {
for i := range nodes.Items {
node := &nodes.Items[i]
addr := net.ParseIP(node.Name)
if addr != nil {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addr.String()}
node.Status.Addresses = []api.NodeAddress{address}
} else {
addrs, err := s.lookupIP(node.Name)
if err != nil {
glog.Errorf("Can't get ip address of node %s: %v", node.Name, err)
} else if len(addrs) == 0 {
glog.Errorf("No ip address for node %v", node.Name)
} else {
address := api.NodeAddress{Type: api.NodeLegacyHostIP, Address: addrs[0].String()}
node.Status.Addresses = []api.NodeAddress{address}
}
}
}
}
return nodes, nil
}
// MonitorNodeStatus verifies node status are constantly updated by kubelet, and if
// not, post node NotReady status. It also evicts all pods if node is not ready for
// a long period of time.
func (s *NodeController) MonitorNodeStatus() error {
nodes, err := s.kubeClient.Nodes().List()
if err != nil {
return err
}
for _, pod := range pods.Items {
if pod.Status.Host != nodeID {
continue
for i := range nodes.Items {
node := &nodes.Items[i]
// Precompute condition times to avoid deep copy of node status (We'll modify node for updating,
// and NodeStatus.Conditions is an array, which makes assignment copy not useful).
latestConditionTime := s.latestConditionTime(node, api.NodeReady)
latestFullConditionTime := s.latestConditionTimeWithStatus(node, api.NodeReady, api.ConditionFull)
// Grace period has passed, post node NotReady condition to master, without contacting kubelet.
if util.Now().After(latestConditionTime.Add(nodeMonitorGracePeriod)) {
readyCondition := s.getCondition(node, api.NodeReady)
if readyCondition == nil {
node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{
Type: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Kubelet never posted node status"),
LastProbeTime: util.Now(),
LastTransitionTime: util.Now(),
})
} else {
readyCondition.Status = api.ConditionNone
readyCondition.Reason = fmt.Sprintf("Kubelet stop posting node status")
readyCondition.LastProbeTime = util.Now()
if readyCondition.Status == api.ConditionFull {
readyCondition.LastTransitionTime = util.Now()
}
}
glog.V(2).Infof("updating node %v, whose status hasn't been updated by kubelet for a long time", node.Name)
_, err = s.kubeClient.Nodes().Update(node)
if err != nil {
glog.Errorf("error updating node %s: %v", node.Name, err)
}
}
glog.V(2).Infof("Delete pod %v", pod.Name)
if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil {
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
// Eviction timeout! Evict all pods on the unhealthy node.
if util.Now().After(latestFullConditionTime.Add(s.podEvictionTimeout)) {
s.deletePods(node.Name)
}
}
return nil
}
// GetStaticNodesWithSpec constructs and returns api.NodeList for static nodes. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The
// method only constructs spec fields for nodes.
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes.
func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{}
for _, nodeID := range s.nodes {
@ -458,8 +465,8 @@ func (s *NodeController) GetStaticNodesWithSpec() (*api.NodeList, error) {
}
// GetCloudNodesWithSpec constructs and returns api.NodeList from cloudprovider. If error
// occurs, an empty NodeList will be returned with a non-nil error info. The
// method only constructs spec fields for nodes.
// occurs, an empty NodeList will be returned with a non-nil error info. The method only
// constructs spec fields for nodes.
func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
result := &api.NodeList{}
instances, ok := s.cloud.Instances()
@ -494,6 +501,27 @@ func (s *NodeController) GetCloudNodesWithSpec() (*api.NodeList, error) {
return result, nil
}
// deletePods will delete all pods from master running on given node.
func (s *NodeController) deletePods(nodeID string) error {
glog.V(2).Infof("Delete all pods from %v", nodeID)
// TODO: We don't yet have field selectors from client, see issue #1362.
pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything())
if err != nil {
return err
}
for _, pod := range pods.Items {
if pod.Status.Host != nodeID {
continue
}
glog.V(2).Infof("Delete pod %v", pod.Name)
if err := s.kubeClient.Pods(pod.Namespace).Delete(pod.Name); err != nil {
glog.Errorf("Error deleting pod %v: %v", pod.Name, err)
}
}
return nil
}
// isRunningCloudProvider checks if cluster is running with cloud provider.
func (s *NodeController) isRunningCloudProvider() bool {
return s.cloud != nil && len(s.matchRE) > 0
@ -517,3 +545,30 @@ func (s *NodeController) getCondition(node *api.Node, conditionType api.NodeCond
}
return nil
}
// latestConditionTime returns the latest condition timestamp for the node, regardless of condition status.
// If nothing matches, the node creation timestamp will be returned.
func (s *NodeController) latestConditionTime(node *api.Node, conditionType api.NodeConditionType) util.Time {
readyTime := node.ObjectMeta.CreationTimestamp
for _, condition := range node.Status.Conditions {
if condition.Type == conditionType &&
condition.LastProbeTime.After(readyTime.Time) {
readyTime = condition.LastProbeTime
}
}
return readyTime
}
// latestConditionTimeWithStatus returns the latest condition timestamp for the node, with given condition status.
// If nothing matches, the node creation timestamp will be returned.
func (s *NodeController) latestConditionTimeWithStatus(node *api.Node, conditionType api.NodeConditionType, conditionStatus api.ConditionStatus) util.Time {
readyTime := node.ObjectMeta.CreationTimestamp
for _, condition := range node.Status.Conditions {
if condition.Type == conditionType &&
condition.Status == conditionStatus &&
condition.LastProbeTime.After(readyTime.Time) {
readyTime = condition.LastProbeTime
}
}
return readyTime
}

View File

@ -39,7 +39,7 @@ import (
)
// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It
// allows test cases to have fine-grained control over mock behaviors. We alos need
// allows test cases to have fine-grained control over mock behaviors. We also need
// PodsInterface and PodInterface to test list & delet pods, which is implemented in
// the embeded client.Fake field.
type FakeNodeHandler struct {
@ -377,7 +377,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
}
}
func TestSyncCloud(t *testing.T) {
func TestSyncCloudNodes(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeCloud *fake_cloud.FakeCloud
@ -464,7 +464,7 @@ func TestSyncCloud(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
if err := nodeController.SyncCloud(); err != nil {
if err := nodeController.SyncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
@ -485,7 +485,7 @@ func TestSyncCloud(t *testing.T) {
}
}
func TestSyncCloudDeletePods(t *testing.T) {
func TestSyncCloudNodesEvictPods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeCloud *fake_cloud.FakeCloud
@ -546,7 +546,7 @@ func TestSyncCloudDeletePods(t *testing.T) {
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
if err := nodeController.SyncCloud(); err != nil {
if err := nodeController.SyncCloudNodes(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
@ -688,6 +688,106 @@ func TestPopulateNodeAddresses(t *testing.T) {
}
}
func TestSyncNodeStatus(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
fakeCloud *fake_cloud.FakeCloud
expectedNodes []*api.Node
expectedRequestCount int
}{
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
},
fakeKubeletClient: &FakeKubeletClient{
Status: probe.Success,
Err: nil,
},
fakeCloud: &fake_cloud.FakeCloud{
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
},
expectedNodes: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
},
{
Type: api.NodeSchedulable,
Status: api.ConditionFull,
Reason: "Node is schedulable by default",
},
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
},
},
},
{
ObjectMeta: api.ObjectMeta{Name: "node1"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
},
{
Type: api.NodeSchedulable,
Status: api.ConditionFull,
Reason: "Node is schedulable by default",
},
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
},
},
},
},
expectedRequestCount: 3, // List + 2xUpdate
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
for i := range item.fakeNodeHandler.UpdatedNodes {
conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions
for j := range conditions {
if conditions[j].LastTransitionTime.IsZero() {
t.Errorf("unexpected zero last transition timestamp")
}
if conditions[j].LastProbeTime.IsZero() {
t.Errorf("unexpected zero last probe timestamp")
}
conditions[j].LastTransitionTime = util.Time{}
conditions[j].LastProbeTime = util.Time{}
}
}
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
}
// Second sync will also update the node.
item.fakeNodeHandler.RequestCount = 0
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
}
}
func TestSyncNodeStatusTransitionTime(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
@ -794,119 +894,7 @@ func TestSyncNodeStatusTransitionTime(t *testing.T) {
}
}
func TestEvictTimeoutedPods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
expectedRequestCount int
expectedActions []client.FakeAction
}{
// Node created long time ago, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
// Node created recently, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Now(),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: nil,
},
// Node created long time ago, with status updated long time ago.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}},
},
// Node created long time ago, with status updated recently.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Now(),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedActions: nil,
},
}
for _, item := range table {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
if err := nodeController.EvictTimeoutedPods(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) {
t.Errorf("actions differs, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions)
}
}
}
func TestSyncNodeStatusDeletePods(t *testing.T) {
func TestSyncNodeStatusEvictPods(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
@ -1056,77 +1044,153 @@ func TestSyncNodeStatusDeletePods(t *testing.T) {
}
}
func TestSyncNodeStatus(t *testing.T) {
func TestMonitorNodeStatus(t *testing.T) {
table := []struct {
fakeNodeHandler *FakeNodeHandler
fakeKubeletClient *FakeKubeletClient
fakeCloud *fake_cloud.FakeCloud
expectedNodes []*api.Node
expectedRequestCount int
expectedEvictPods bool
expectedNodes []*api.Node
}{
// Node created long time ago, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{newNode("node0"), newNode("node1")},
},
fakeKubeletClient: &FakeKubeletClient{
Status: probe.Success,
Err: nil,
},
fakeCloud: &fake_cloud.FakeCloud{
Addresses: []api.NodeAddress{{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"}},
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 2, // List+Update
expectedEvictPods: true,
expectedNodes: []*api.Node{
{
ObjectMeta: api.ObjectMeta{Name: "node0"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
},
{
Type: api.NodeSchedulable,
Status: api.ConditionFull,
Reason: "Node is schedulable by default",
},
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
},
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
{
ObjectMeta: api.ObjectMeta{Name: "node1"},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok",
Type: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Kubelet never posted node status"),
LastProbeTime: util.Time{},
LastTransitionTime: util.Time{},
},
{
Type: api.NodeSchedulable,
Status: api.ConditionFull,
Reason: "Node is schedulable by default",
},
},
Addresses: []api.NodeAddress{
{Type: api.NodeLegacyHostIP, Address: "1.2.3.4"},
},
},
},
},
expectedRequestCount: 3, // List + 2xUpdate
},
// Node created recently, with no status.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Now(),
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedEvictPods: false,
expectedNodes: nil,
},
// Node created long time ago, with status updated long time ago.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
LastProbeTime: util.Date(2013, 1, 1, 0, 0, 0, 0, time.UTC),
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 2, // List+Update
expectedEvictPods: true,
expectedNodes: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionNone,
Reason: fmt.Sprintf("Kubelet stop posting node status"),
LastProbeTime: util.Time{},
LastTransitionTime: util.Time{},
},
},
},
},
},
},
// Node created long time ago, with status updated recently.
{
fakeNodeHandler: &FakeNodeHandler{
Existing: []*api.Node{
{
ObjectMeta: api.ObjectMeta{
Name: "node0",
CreationTimestamp: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC),
},
Status: api.NodeStatus{
Conditions: []api.NodeCondition{
{
Type: api.NodeReady,
Status: api.ConditionFull,
LastProbeTime: util.Now(),
LastTransitionTime: util.Time{},
},
},
},
},
},
Fake: client.Fake{
PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}},
},
},
expectedRequestCount: 1, // List
expectedEvictPods: false,
expectedNodes: nil,
},
}
for _, item := range table {
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
if err := nodeController.SyncNodeStatus(); err != nil {
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
if err := nodeController.MonitorNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
if item.expectedRequestCount != item.fakeNodeHandler.RequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
}
for i := range item.fakeNodeHandler.UpdatedNodes {
@ -1145,13 +1209,14 @@ func TestSyncNodeStatus(t *testing.T) {
if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) {
t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0])
}
// Second sync will also update the node.
item.fakeNodeHandler.RequestCount = 0
if err := nodeController.SyncNodeStatus(); err != nil {
t.Errorf("unexpected error: %v", err)
podEvicted := false
for _, action := range item.fakeNodeHandler.Actions {
if action.Action == "delete-pod" {
podEvicted = true
}
}
if item.fakeNodeHandler.RequestCount != item.expectedRequestCount {
t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount)
if item.expectedEvictPods != podEvicted {
t.Errorf("expected pod eviction: %+v, got %+v", item.expectedEvictPods, podEvicted)
}
}
}