diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 9bc19d3c290..3612586ac96 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -202,8 +202,9 @@ func startComponents(manifestURL string) (apiServerURL string) { controllerManager.Run(10 * time.Minute) nodeResources := &api.NodeResources{} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}) - nodeController.Run(5*time.Second, 10, true) + + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute) + nodeController.Run(5*time.Second, true) // Kubelet (localhost) testRootDir := makeTempDirOrDie("kubelet_integ_1.") diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index a81209d750f..39a950be88a 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -123,8 +123,9 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, }, } kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort} - nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient) - nodeController.Run(10*time.Second, 10, true) + + nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute) + nodeController.Run(10*time.Second, true) endpoints := service.NewEndpointController(cl) go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) diff --git a/pkg/api/types.go b/pkg/api/types.go index be7844e0e27..eecb95882eb 100644 --- a/pkg/api/types.go +++ b/pkg/api/types.go @@ -774,6 +774,7 @@ const ( type NodeCondition struct { Kind NodeConditionKind `json:"kind"` Status NodeConditionStatus `json:"status"` + LastProbeTime util.Time `json:"lastProbeTime,omitempty"` LastTransitionTime util.Time `json:"lastTransitionTime,omitempty"` Reason string `json:"reason,omitempty"` Message string `json:"message,omitempty"` diff --git a/pkg/api/v1beta1/types.go b/pkg/api/v1beta1/types.go index 23b2b5c14a1..1556d99dcf8 100644 --- a/pkg/api/v1beta1/types.go +++ b/pkg/api/v1beta1/types.go @@ -619,6 +619,7 @@ const ( type NodeCondition struct { Kind NodeConditionKind `json:"kind" description:"kind of the condition, one of reachable, ready"` Status NodeConditionStatus `json:"status" description:"status of the condition, one of full, none, unknown"` + LastProbeTime util.Time `json:"lastProbeTime,omitempty" description:"last time the condition was probed"` LastTransitionTime util.Time `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"` Reason string `json:"reason,omitempty" description:"(brief) reason for the condition's last transition"` Message string `json:"message,omitempty" description:"human readable message indicating details about last transition"` diff --git a/pkg/api/v1beta2/types.go b/pkg/api/v1beta2/types.go index 8d042ceee8c..fbf796ae80c 100644 --- a/pkg/api/v1beta2/types.go +++ b/pkg/api/v1beta2/types.go @@ -583,6 +583,7 @@ const ( type NodeCondition struct { Kind NodeConditionKind `json:"kind" description:"kind of the condition, one of reachable, ready"` Status NodeConditionStatus `json:"status" description:"status of the condition, one of full, none, unknown"` + LastProbeTime util.Time `json:"lastProbeTime,omitempty" description:"last time the condition was probed"` LastTransitionTime util.Time `json:"lastTransitionTime,omitempty" description:"last time the condition transit from one status to another"` Reason string `json:"reason,omitempty" description:"(brief) reason for the condition's last transition"` Message string `json:"message,omitempty" description:"human readable message indicating details about last transition"` diff --git a/pkg/api/v1beta3/types.go b/pkg/api/v1beta3/types.go index 89ae8ccd025..01cf93679cb 100644 --- a/pkg/api/v1beta3/types.go +++ b/pkg/api/v1beta3/types.go @@ -808,6 +808,7 @@ const ( type NodeCondition struct { Kind NodeConditionKind `json:"kind"` Status NodeConditionStatus `json:"status"` + LastProbeTime util.Time `json:"lastProbeTime,omitempty"` LastTransitionTime util.Time `json:"lastTransitionTime,omitempty"` Reason string `json:"reason,omitempty"` Message string `json:"message,omitempty"` diff --git a/pkg/cloudprovider/controller/nodecontroller.go b/pkg/cloudprovider/controller/nodecontroller.go index feb64a7c608..2b8dec28c30 100644 --- a/pkg/cloudprovider/controller/nodecontroller.go +++ b/pkg/cloudprovider/controller/nodecontroller.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "net" - "reflect" "strings" "sync" "time" @@ -29,6 +28,7 @@ import ( apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/golang/glog" @@ -41,12 +41,14 @@ var ( ) type NodeController struct { - cloud cloudprovider.Interface - matchRE string - staticResources *api.NodeResources - nodes []string - kubeClient client.Interface - kubeletClient client.KubeletHealthChecker + cloud cloudprovider.Interface + matchRE string + staticResources *api.NodeResources + nodes []string + kubeClient client.Interface + kubeletClient client.KubeletHealthChecker + registerRetryCount int + podEvictionTimeout time.Duration } // NewNodeController returns a new node controller to sync instances from cloudprovider. @@ -58,20 +60,24 @@ func NewNodeController( nodes []string, staticResources *api.NodeResources, kubeClient client.Interface, - kubeletClient client.KubeletHealthChecker) *NodeController { + kubeletClient client.KubeletHealthChecker, + registerRetryCount int, + podEvictionTimeout time.Duration) *NodeController { return &NodeController{ - cloud: cloud, - matchRE: matchRE, - nodes: nodes, - staticResources: staticResources, - kubeClient: kubeClient, - kubeletClient: kubeletClient, + cloud: cloud, + matchRE: matchRE, + nodes: nodes, + staticResources: staticResources, + kubeClient: kubeClient, + kubeletClient: kubeletClient, + registerRetryCount: registerRetryCount, + podEvictionTimeout: podEvictionTimeout, } } // Run creates initial node list and start syncing instances from cloudprovider if any. // It also starts syncing cluster node status. -func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList bool) { +func (s *NodeController) Run(period time.Duration, syncNodeList bool) { // Register intial set of nodes with their status set. var nodes *api.NodeList var err error @@ -95,7 +101,7 @@ func (s *NodeController) Run(period time.Duration, retryCount int, syncNodeList if err != nil { glog.Errorf("Error getting nodes ips: %v", err) } - if err = s.RegisterNodes(nodes, retryCount, period); err != nil { + if err = s.RegisterNodes(nodes, s.registerRetryCount, period); err != nil { glog.Errorf("Error registrying node list %+v: %v", nodes, err) } @@ -180,6 +186,7 @@ func (s *NodeController) SyncCloud() error { if err != nil { glog.Errorf("Delete node error: %s", nodeID) } + s.deletePods(nodeID) } return nil @@ -191,20 +198,15 @@ func (s *NodeController) SyncNodeStatus() error { if err != nil { return err } - oldNodes := make(map[string]api.Node) - for _, node := range nodes.Items { - oldNodes[node.Name] = node - } nodes = s.DoChecks(nodes) nodes, err = s.PopulateIPs(nodes) if err != nil { return err } for _, node := range nodes.Items { - if reflect.DeepEqual(node, oldNodes[node.Name]) { - glog.V(2).Infof("skip updating node %v", node.Name) - continue - } + // We used to skip updating node when node status doesn't change, this is no longer + // useful after we introduce per-probe status field, e.g. 'LastProbeTime', which will + // differ in every call of the sync loop. glog.V(2).Infof("updating node %v", node.Name) _, err = s.kubeClient.Nodes().Update(&node) if err != nil { @@ -273,40 +275,78 @@ func (s *NodeController) DoCheck(node *api.Node) []api.NodeCondition { oldReadyCondition := s.getCondition(node, api.NodeReady) newReadyCondition := s.checkNodeReady(node) if oldReadyCondition != nil && oldReadyCondition.Status == newReadyCondition.Status { + // If node status doesn't change, transition time is same as last time. newReadyCondition.LastTransitionTime = oldReadyCondition.LastTransitionTime } else { + // Set transition time to Now() if node status changes or `oldReadyCondition` is nil, which + // happens only when the node is checked for the first time. newReadyCondition.LastTransitionTime = util.Now() } + + if newReadyCondition.Status != api.ConditionFull { + // Node is not ready for this probe, we need to check if pods need to be deleted. + if newReadyCondition.LastProbeTime.After(newReadyCondition.LastTransitionTime.Add(s.podEvictionTimeout)) { + // As long as the node fails, we call delete pods to delete all pods. Node controller sync + // is not a closed loop process, there is no feedback from other components regarding pod + // status. Keep listing pods to sanity check if pods are all deleted makes more sense. + s.deletePods(node.Name) + } + } + conditions = append(conditions, *newReadyCondition) return conditions } -// checkNodeReady checks raw node ready condition, without timestamp set. +// checkNodeReady checks raw node ready condition, without transition timestamp set. func (s *NodeController) checkNodeReady(node *api.Node) *api.NodeCondition { switch status, err := s.kubeletClient.HealthCheck(node.Name); { case err != nil: glog.V(2).Infof("NodeController: node %s health check error: %v", node.Name, err) return &api.NodeCondition{ - Kind: api.NodeReady, - Status: api.ConditionUnknown, - Reason: fmt.Sprintf("Node health check error: %v", err), + Kind: api.NodeReady, + Status: api.ConditionUnknown, + Reason: fmt.Sprintf("Node health check error: %v", err), + LastProbeTime: util.Now(), } case status == probe.Failure: return &api.NodeCondition{ - Kind: api.NodeReady, - Status: api.ConditionNone, - Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"), + Kind: api.NodeReady, + Status: api.ConditionNone, + Reason: fmt.Sprintf("Node health check failed: kubelet /healthz endpoint returns not ok"), + LastProbeTime: util.Now(), } default: return &api.NodeCondition{ - Kind: api.NodeReady, - Status: api.ConditionFull, - Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"), + Kind: api.NodeReady, + Status: api.ConditionFull, + Reason: fmt.Sprintf("Node health check succeeded: kubelet /healthz endpoint returns ok"), + LastProbeTime: util.Now(), } } } +// deletePods will delete all pods from master running on given node. +func (s *NodeController) deletePods(nodeID string) error { + glog.V(2).Infof("Delete all pods from %v", nodeID) + // TODO: We don't yet have field selectors from client, see issue #1362. + pods, err := s.kubeClient.Pods(api.NamespaceAll).List(labels.Everything()) + if err != nil { + return err + } + for _, pod := range pods.Items { + if pod.Status.Host != nodeID { + continue + } + glog.V(2).Infof("Delete pod %v", pod.Name) + if err := s.kubeClient.Pods(api.NamespaceAll).Delete(pod.Name); err != nil { + glog.Errorf("Error deleting pod %v", pod.Name) + } + } + + return nil +} + // StaticNodes constructs and returns api.NodeList for static nodes. If error // occurs, an empty NodeList will be returned with a non-nil error info. func (s *NodeController) StaticNodes() (*api.NodeList, error) { diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 86ee4efce0c..9c7abfbfec9 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -33,10 +33,12 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. +// FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It +// allows test cases to have fine-grained control over mock behaviors. We alos need +// PodsInterface and PodInterface to test list & delet pods, which is implemented in +// the embeded client.Fake field. type FakeNodeHandler struct { client.Fake - client.FakeNodes // Input: Hooks determine if request is valid or not CreateHook func(*FakeNodeHandler, *api.Node) bool @@ -69,6 +71,10 @@ func (m *FakeNodeHandler) Create(node *api.Node) (*api.Node, error) { } } +func (m *FakeNodeHandler) Get(name string) (*api.Node, error) { + return nil, nil +} + func (m *FakeNodeHandler) List() (*api.NodeList, error) { defer func() { m.RequestCount++ }() var nodes []*api.Node @@ -224,7 +230,7 @@ func TestRegisterNodes(t *testing.T) { for _, machine := range item.machines { nodes.Items = append(nodes.Items, *newNode(machine)) } - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil) + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond) if !item.expectedFail && err != nil { t.Errorf("unexpected error: %v", err) @@ -262,10 +268,27 @@ func TestCreateStaticNodes(t *testing.T) { }, }, }, + { + machines: []string{"node0", "node1"}, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Spec: api.NodeSpec{}, + Status: api.NodeStatus{}, + }, + }, + }, + }, } for _, item := range table { - nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil) + nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, nil, nil, 10, time.Minute) nodes, err := nodeController.StaticNodes() if err != nil { t.Errorf("unexpected error: %v", err) @@ -305,10 +328,28 @@ func TestCreateCloudNodes(t *testing.T) { }, }, }, + { + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + NodeResources: &api.NodeResources{Capacity: resourceList}, + }, + expectedNodes: &api.NodeList{ + Items: []api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Spec: api.NodeSpec{Capacity: resourceList}, + }, + { + ObjectMeta: api.ObjectMeta{Name: "node1"}, + Spec: api.NodeSpec{Capacity: resourceList}, + }, + }, + }, + }, } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute) nodes, err := nodeController.CloudNodes() if err != nil { t.Errorf("unexpected error: %v", err) @@ -329,6 +370,20 @@ func TestSyncCloud(t *testing.T) { expectedDeleted []string }{ { + // 1 existing node, 1 cloud nodes: do nothing. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0")}, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 1, // List + expectedCreated: []string{}, + expectedDeleted: []string{}, + }, + { + // 1 existing node, 2 cloud nodes: create 1. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0")}, }, @@ -341,6 +396,7 @@ func TestSyncCloud(t *testing.T) { expectedDeleted: []string{}, }, { + // 2 existing nodes, 1 cloud node: delete 1. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0"), newNode("node1")}, }, @@ -353,6 +409,7 @@ func TestSyncCloud(t *testing.T) { expectedDeleted: []string{"node1"}, }, { + // 1 existing node, 3 cloud nodes but only 2 match regex: delete 1. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{newNode("node0")}, }, @@ -367,7 +424,7 @@ func TestSyncCloud(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil) + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) if err := nodeController.SyncCloud(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -385,6 +442,83 @@ func TestSyncCloud(t *testing.T) { } } +func TestSyncCloudDeletePods(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeCloud *fake_cloud.FakeCloud + matchRE string + expectedRequestCount int + expectedDeleted []string + expectedActions []client.FakeAction + }{ + { + // No node to delete: do nothing. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0"), *newPod("pod1", "node1")}}, + }, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0", "node1"}, + }, + matchRE: ".*", + expectedRequestCount: 1, // List + expectedDeleted: []string{}, + expectedActions: nil, + }, + { + // Delete node1, and pod0 is running on it. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}}, + }, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Delete + expectedDeleted: []string{"node1"}, + expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, + }, + { + // Delete node1, but pod0 is running on node0. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{newNode("node0"), newNode("node1")}, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + fakeCloud: &fake_cloud.FakeCloud{ + Machines: []string{"node0"}, + }, + matchRE: ".*", + expectedRequestCount: 2, // List + Delete + expectedDeleted: []string{"node1"}, + expectedActions: []client.FakeAction{{Action: "list-pods"}}, + }, + } + + for _, item := range table { + nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute) + if err := nodeController.SyncCloud(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + nodes := sortedNodeNames(item.fakeNodeHandler.DeletedNodes) + if !reflect.DeepEqual(item.expectedDeleted, nodes) { + t.Errorf("expected node list %+v, got %+v", item.expectedDeleted, nodes) + } + if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { + t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) + } + } +} + func TestHealthCheckNode(t *testing.T) { table := []struct { node *api.Node @@ -420,7 +554,7 @@ func TestHealthCheckNode(t *testing.T) { }, }, { - node: newNode("node1"), + node: newNode("node0"), fakeKubeletClient: &FakeKubeletClient{ Status: probe.Failure, Err: errors.New("Error"), @@ -436,13 +570,17 @@ func TestHealthCheckNode(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient) + nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute) conditions := nodeController.DoCheck(item.node) for i := range conditions { if conditions[i].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero timestamp") + t.Errorf("unexpected zero last transition timestamp") + } + if conditions[i].LastProbeTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") } conditions[i].LastTransitionTime = util.Time{} + conditions[i].LastProbeTime = util.Time{} } if !reflect.DeepEqual(item.expectedConditions, conditions) { t.Errorf("expected conditions %+v, got %+v", item.expectedConditions, conditions) @@ -470,7 +608,7 @@ func TestPopulateNodeIPs(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute) result, err := nodeController.PopulateIPs(item.nodes) // In case of IP querying error, we should continue. if err != nil { @@ -484,14 +622,15 @@ func TestPopulateNodeIPs(t *testing.T) { } } -func TestNodeStatusTransitionTime(t *testing.T) { +func TestSyncNodeStatusTransitionTime(t *testing.T) { table := []struct { - fakeNodeHandler *FakeNodeHandler - fakeKubeletClient *FakeKubeletClient - expectedNodes []*api.Node - expectedRequestCount int + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + expectedRequestCount int + expectedTransitionTimeChange bool }{ { + // Existing node is healthy, current porbe is healthy too. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { @@ -513,10 +652,11 @@ func TestNodeStatusTransitionTime(t *testing.T) { Status: probe.Success, Err: nil, }, - expectedNodes: []*api.Node{}, - expectedRequestCount: 1, + expectedRequestCount: 2, // List+Update + expectedTransitionTimeChange: false, }, { + // Existing node is healthy, current porbe is unhealthy. fakeNodeHandler: &FakeNodeHandler{ Existing: []*api.Node{ { @@ -538,27 +678,13 @@ func TestNodeStatusTransitionTime(t *testing.T) { Status: probe.Failure, Err: nil, }, - expectedNodes: []*api.Node{ - { - ObjectMeta: api.ObjectMeta{Name: "node0"}, - Status: api.NodeStatus{ - Conditions: []api.NodeCondition{ - { - Kind: api.NodeReady, - Status: api.ConditionFull, - Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", - LastTransitionTime: util.Now(), // Placeholder expected transition time, due to inability to mock time. - }, - }, - }, - }, - }, - expectedRequestCount: 2, + expectedRequestCount: 2, // List+Update + expectedTransitionTimeChange: true, }, } for _, item := range table { - nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient) + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) if err := nodeController.SyncNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -568,14 +694,168 @@ func TestNodeStatusTransitionTime(t *testing.T) { for i := range item.fakeNodeHandler.UpdatedNodes { conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions for j := range conditions { - if !conditions[j].LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { - t.Errorf("unexpected timestamp %v", conditions[j].LastTransitionTime) + condition := conditions[j] + if item.expectedTransitionTimeChange { + if !condition.LastTransitionTime.After(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { + t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime) + } + } else { + if !condition.LastTransitionTime.Equal(time.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC)) { + t.Errorf("unexpected last transition timestamp %v", condition.LastTransitionTime) + } } } } } } +func TestSyncNodeStatusDeletePods(t *testing.T) { + table := []struct { + fakeNodeHandler *FakeNodeHandler + fakeKubeletClient *FakeKubeletClient + expectedRequestCount int + expectedActions []client.FakeAction + }{ + { + // Existing node is healthy, current porbe is healthy too. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node1")}}, + }, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Success, + Err: nil, + }, + expectedRequestCount: 2, // List+Update + expectedActions: nil, + }, + { + // Existing node is healthy, current porbe is unhealthy, i.e. node just becomes unhealthy. + // Do not delete pods. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionFull, + Reason: "Node health check succeeded: kubelet /healthz endpoint returns ok", + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Failure, + Err: nil, + }, + expectedRequestCount: 2, // List+Update + expectedActions: nil, + }, + { + // Existing node unhealthy, current porbe is unhealthy. Node is still within grace peroid. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionNone, + Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", + // Here, last transition time is Now(). In node controller, the new condition's probe time is + // also Now(). The two calls to Now() yields differnt time due to test execution, but the + // time difference is within 5 minutes, which is the grace peroid. + LastTransitionTime: util.Now(), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Failure, + Err: nil, + }, + expectedRequestCount: 2, // List+Update + expectedActions: nil, + }, + { + // Existing node unhealthy, current porbe is unhealthy. Node exceeds grace peroid. + fakeNodeHandler: &FakeNodeHandler{ + Existing: []*api.Node{ + { + ObjectMeta: api.ObjectMeta{Name: "node0"}, + Status: api.NodeStatus{ + Conditions: []api.NodeCondition{ + { + Kind: api.NodeReady, + Status: api.ConditionNone, + Reason: "Node health check failed: kubelet /healthz endpoint returns not ok", + // Here, last transition time is in the past, and in node controller, the + // new condition's probe time is Now(). The time difference is larger than + // 5*min. The test will fail if system clock is wrong, but we don't yet have + // ways to mock time in our tests. + LastTransitionTime: util.Date(2012, 1, 1, 0, 0, 0, 0, time.UTC), + }, + }, + }, + }, + }, + Fake: client.Fake{ + PodsList: api.PodList{Items: []api.Pod{*newPod("pod0", "node0")}}, + }, + }, + fakeKubeletClient: &FakeKubeletClient{ + Status: probe.Failure, + Err: nil, + }, + expectedRequestCount: 2, // List+Update + expectedActions: []client.FakeAction{{Action: "list-pods"}, {Action: "delete-pod", Value: "pod0"}}, + }, + } + + for _, item := range table { + nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute) + if err := nodeController.SyncNodeStatus(); err != nil { + t.Errorf("unexpected error: %v", err) + } + if item.expectedRequestCount != item.fakeNodeHandler.RequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) + } + if !reflect.DeepEqual(item.expectedActions, item.fakeNodeHandler.Actions) { + t.Errorf("time out waiting for deleting pods, expected %+v, got %+v", item.expectedActions, item.fakeNodeHandler.Actions) + } + } +} + func TestSyncNodeStatus(t *testing.T) { table := []struct { fakeNodeHandler *FakeNodeHandler @@ -628,7 +908,7 @@ func TestSyncNodeStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient) + nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute) if err := nodeController.SyncNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } @@ -639,20 +919,25 @@ func TestSyncNodeStatus(t *testing.T) { conditions := item.fakeNodeHandler.UpdatedNodes[i].Status.Conditions for j := range conditions { if conditions[j].LastTransitionTime.IsZero() { - t.Errorf("unexpected zero timestamp") + t.Errorf("unexpected zero last transition timestamp") + } + if conditions[j].LastProbeTime.IsZero() { + t.Errorf("unexpected zero last probe timestamp") } conditions[j].LastTransitionTime = util.Time{} + conditions[j].LastProbeTime = util.Time{} } } if !reflect.DeepEqual(item.expectedNodes, item.fakeNodeHandler.UpdatedNodes) { t.Errorf("expected nodes %+v, got %+v", item.expectedNodes[0], item.fakeNodeHandler.UpdatedNodes[0]) } + // Second sync will also update the node. item.fakeNodeHandler.RequestCount = 0 if err := nodeController.SyncNodeStatus(); err != nil { t.Errorf("unexpected error: %v", err) } - if item.fakeNodeHandler.RequestCount != 1 { - t.Errorf("expected one list for updating same status, but got %v.", item.fakeNodeHandler.RequestCount) + if item.fakeNodeHandler.RequestCount != item.expectedRequestCount { + t.Errorf("expected %v call, but got %v.", item.expectedRequestCount, item.fakeNodeHandler.RequestCount) } } } @@ -661,6 +946,10 @@ func newNode(name string) *api.Node { return &api.Node{ObjectMeta: api.ObjectMeta{Name: name}} } +func newPod(name, host string) *api.Pod { + return &api.Pod{ObjectMeta: api.ObjectMeta{Name: name}, Status: api.PodStatus{Host: host}} +} + func sortedNodeNames(nodes []*api.Node) []string { nodeNames := []string{} for _, node := range nodes { diff --git a/pkg/controllermanager/controllermanager.go b/pkg/controllermanager/controllermanager.go index 8b0586c4ce9..cc1246e5d95 100644 --- a/pkg/controllermanager/controllermanager.go +++ b/pkg/controllermanager/controllermanager.go @@ -55,6 +55,7 @@ type CMServer struct { RegisterRetryCount int MachineList util.StringList SyncNodeList bool + PodEvictionTimeout time.Duration // TODO: Discover these by pinging the host machines, and rip out these params. NodeMilliCPU int64 @@ -63,7 +64,7 @@ type CMServer struct { KubeletConfig client.KubeletConfig } -// NewCMServer creates a new CMServer with default a default config. +// NewCMServer creates a new CMServer with a default config. func NewCMServer() *CMServer { s := CMServer{ Port: ports.ControllerManagerPort, @@ -71,6 +72,7 @@ func NewCMServer() *CMServer { NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, RegisterRetryCount: 10, + PodEvictionTimeout: 5 * time.Minute, NodeMilliCPU: 1000, NodeMemory: resource.MustParse("3Gi"), SyncNodeList: true, @@ -110,6 +112,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { "The period for syncing nodes from cloudprovider. Longer periods will result in "+ "fewer calls to cloud provider, but may delay addition of new nodes to cluster.") fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") + fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.") fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+ "The number of retries for initial node registration. Retry interval equals node_sync_period.") fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.") @@ -169,8 +172,10 @@ func (s *CMServer) Run(_ []string) error { api.ResourceMemory: s.NodeMemory, }, } - nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, kubeClient, kubeletClient) - nodeController.Run(s.NodeSyncPeriod, s.RegisterRetryCount, s.SyncNodeList) + + nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources, + kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout) + nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList) resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient) resourceQuotaManager.Run(s.ResourceQuotaSyncPeriod)