From 2201e756664b8594bf1f59f9d67d340c55dae529 Mon Sep 17 00:00:00 2001 From: gmarek Date: Tue, 4 Aug 2015 14:44:14 +0200 Subject: [PATCH] NodeController small cleanup --- cmd/integration/integration.go | 2 +- .../app/controllermanager.go | 2 +- cmd/kubernetes/kubernetes.go | 2 +- .../controllermanager/controllermanager.go | 2 +- docs/admin/kube-controller-manager.md | 1 - pkg/controller/node/nodecontroller.go | 336 +++++++++--------- pkg/controller/node/nodecontroller_test.go | 4 +- 7 files changed, 172 insertions(+), 177 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 125d488375a..64323d1c39b 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -196,7 +196,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // TODO: Write an integration test for the replication controllers watch. go controllerManager.Run(3, util.NeverStop) - nodeController := nodecontroller.NewNodeController(nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()), + nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewFakeRateLimiter()), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(5 * time.Second) cadvisorInterface := new(cadvisor.Fake) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 58508a683a3..5817509d7a0 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -194,7 +194,7 @@ func (s *CMServer) Run(_ []string) error { glog.Fatalf("Cloud provider could not be initialized: %v", err) } - nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount, + nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index c0d7b2ba1eb..c1188b2c72c 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -131,7 +131,7 @@ func runControllerManager(cl *client.Client) { const serviceSyncPeriod = 5 * time.Minute const nodeSyncPeriod = 10 * time.Second nodeController := nodecontroller.NewNodeController( - nil, cl, 10, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)), + nil, cl, 5*time.Minute, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst)), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) nodeController.Run(nodeSyncPeriod) diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index a0d016333b1..9a4b141cb7e 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -122,7 +122,7 @@ func (s *CMServer) Run(_ []string) error { glog.Fatalf("Cloud provider could not be initialized: %v", err) } - nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.RegisterRetryCount, + nodeController := nodecontroller.NewNodeController(cloud, kubeClient, s.PodEvictionTimeout, nodecontroller.NewPodEvictor(util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst)), s.NodeMonitorGracePeriod, s.NodeStartupGracePeriod, s.NodeMonitorPeriod, (*net.IPNet)(&s.ClusterCIDR), s.AllocateNodeCIDRs) nodeController.Run(s.NodeSyncPeriod) diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index a88b7bae791..063ec8f71d3 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -73,7 +73,6 @@ controller, and serviceaccounts controller. --port=0: The port that the controller-manager's http service runs on --profiling=true: Enable profiling via web interface host:port/debug/pprof/ --pvclaimbinder-sync-period=0: The period for syncing persistent volumes and persistent volume claims - --register-retry-count=0: The number of retries for initial node registration. Retry interval equals node-sync-period. --resource-quota-sync-period=0: The period for syncing quota usage status in the system --root-ca-file="": If set, this root certificate authority will be included in service account's token secret. This must be a valid PEM-encoded CA bundle. --service-account-private-key-file="": Filename containing a PEM-encoded private RSA key used to sign service account tokens. diff --git a/pkg/controller/node/nodecontroller.go b/pkg/controller/node/nodecontroller.go index fe8b60e3217..bcd9737cf76 100644 --- a/pkg/controller/node/nodecontroller.go +++ b/pkg/controller/node/nodecontroller.go @@ -53,19 +53,13 @@ type nodeStatusData struct { } type NodeController struct { + allocateNodeCIDRs bool cloud cloudprovider.Interface - kubeClient client.Interface - recorder record.EventRecorder - registerRetryCount int - podEvictionTimeout time.Duration + clusterCIDR *net.IPNet deletingPodsRateLimiter util.RateLimiter - // worker that evicts pods from unresponsive nodes. - podEvictor *PodEvictor - - // per Node map storing last observed Status together with a local time when it was observed. - // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this - // to aviod the problem with time skew across the cluster. - nodeStatusMap map[string]nodeStatusData + kubeClient client.Interface + // Method for easy mocking in unittest. + lookupIP func(host string) ([]net.IP, error) // Value used if sync_nodes_status=False. NodeController will not proactively // sync node status in this case, but will monitor node status updated from kubelet. If // it doesn't receive update for this amount of time, it will start posting "NodeReady== @@ -81,25 +75,28 @@ type NodeController struct { // 2. nodeMonitorGracePeriod can't be too large for user experience - larger value takes // longer for user to see up-to-date node status. nodeMonitorGracePeriod time.Duration - // Value used if sync_nodes_status=False, only for node startup. When node - // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. - nodeStartupGracePeriod time.Duration // Value controlling NodeController monitoring period, i.e. how often does NodeController // check node status posted from kubelet. This value should be lower than nodeMonitorGracePeriod. // TODO: Change node status monitor to watch based. nodeMonitorPeriod time.Duration - clusterCIDR *net.IPNet - allocateNodeCIDRs bool - // Method for easy mocking in unittest. - lookupIP func(host string) ([]net.IP, error) - now func() util.Time + // Value used if sync_nodes_status=False, only for node startup. When node + // is just created, e.g. cluster bootstrap or node creation, we give a longer grace period. + nodeStartupGracePeriod time.Duration + // per Node map storing last observed Status together with a local time when it was observed. + // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this + // to aviod the problem with time skew across the cluster. + nodeStatusMap map[string]nodeStatusData + now func() util.Time + // worker that evicts pods from unresponsive nodes. + podEvictor *PodEvictor + podEvictionTimeout time.Duration + recorder record.EventRecorder } // NewNodeController returns a new node controller to sync instances from cloudprovider. func NewNodeController( cloud cloudprovider.Interface, kubeClient client.Interface, - registerRetryCount int, podEvictionTimeout time.Duration, podEvictor *PodEvictor, nodeMonitorGracePeriod time.Duration, @@ -123,7 +120,6 @@ func NewNodeController( cloud: cloud, kubeClient: kubeClient, recorder: recorder, - registerRetryCount: registerRetryCount, podEvictionTimeout: podEvictionTimeout, podEvictor: podEvictor, nodeStatusMap: make(map[string]nodeStatusData), @@ -137,6 +133,44 @@ func NewNodeController( } } +// Run starts an asynchronous loop that monitors the status of cluster nodes. +func (nc *NodeController) Run(period time.Duration) { + // Incorporate the results of node status pushed from kubelet to master. + go util.Forever(func() { + if err := nc.monitorNodeStatus(); err != nil { + glog.Errorf("Error monitoring node status: %v", err) + } + }, nc.nodeMonitorPeriod) + + go util.Forever(func() { + nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) }) + }, nodeEvictionPeriod) +} + +// deletePods will delete all pods from master running on given node. +func (nc *NodeController) deletePods(nodeID string) error { + glog.V(2).Infof("Delete all pods from %v", nodeID) + pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), + fields.OneTermEqualSelector(client.PodHost, nodeID)) + if err != nil { + return err + } + nc.recordNodeEvent(nodeID, fmt.Sprintf("Deleting all Pods from Node %v.", nodeID)) + for _, pod := range pods.Items { + // Defensive check, also needed for tests. + if pod.Spec.NodeName != nodeID { + continue + } + glog.V(2).Infof("Delete pod %v", pod.Name) + nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID) + if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { + glog.Errorf("Error deleting pod %v: %v", pod.Name, err) + } + } + + return nil +} + // Generates num pod CIDRs that could be assigned to nodes. func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet { res := util.NewStringSet() @@ -150,6 +184,108 @@ func generateCIDRs(clusterCIDR *net.IPNet, num int) util.StringSet { return res } +// getCondition returns a condition object for the specific condition +// type, nil if the condition is not set. +func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { + if status == nil { + return nil + } + for i := range status.Conditions { + if status.Conditions[i].Type == conditionType { + return &status.Conditions[i] + } + } + return nil +} + +// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, +// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or +// not reachable for a long period of time. +func (nc *NodeController) monitorNodeStatus() error { + nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) + if err != nil { + return err + } + if nc.allocateNodeCIDRs { + // TODO (cjcullen): Use pkg/controller/framework to watch nodes and + // reduce lists/decouple this from monitoring status. + nc.reconcileNodeCIDRs(nodes) + } + for i := range nodes.Items { + var gracePeriod time.Duration + var lastReadyCondition api.NodeCondition + var readyCondition *api.NodeCondition + node := &nodes.Items[i] + for rep := 0; rep < nodeStatusUpdateRetry; rep++ { + gracePeriod, lastReadyCondition, readyCondition, err = nc.tryUpdateNodeStatus(node) + if err == nil { + break + } + name := node.Name + node, err = nc.kubeClient.Nodes().Get(name) + if err != nil { + glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) + break + } + } + if err != nil { + glog.Errorf("Update status of Node %v from NodeController exceeds retry count."+ + "Skipping - no pods will be evicted.", node.Name) + continue + } + + decisionTimestamp := nc.now() + + if readyCondition != nil { + // Check eviction timeout against decisionTimestamp + if lastReadyCondition.Status == api.ConditionFalse && + decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { + if nc.podEvictor.AddNodeToEvict(node.Name) { + glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) + } + } + if lastReadyCondition.Status == api.ConditionUnknown && + decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) { + if nc.podEvictor.AddNodeToEvict(node.Name) { + glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) + } + } + if lastReadyCondition.Status == api.ConditionTrue { + if nc.podEvictor.RemoveNodeToEvict(node.Name) { + glog.Infof("Pods on %v won't be evicted", node.Name) + } + } + + // Report node event. + if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue { + nc.recordNodeStatusChange(node, "NodeNotReady") + } + + // Check with the cloud provider to see if the node still exists. If it + // doesn't, delete the node and all pods scheduled on the node. + if readyCondition.Status != api.ConditionTrue && nc.cloud != nil { + instances, ok := nc.cloud.Instances() + if !ok { + glog.Errorf("%v", ErrCloudInstance) + continue + } + if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound { + glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name) + nc.recordNodeEvent(node.Name, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) + if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil { + glog.Errorf("Unable to delete node %s: %v", node.Name, err) + continue + } + if err := nc.deletePods(node.Name); err != nil { + glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) + } + } + } + } + } + return nil +} + // reconcileNodeCIDRs looks at each node and assigns it a valid CIDR // if it doesn't currently have one. func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) { @@ -179,18 +315,15 @@ func (nc *NodeController) reconcileNodeCIDRs(nodes *api.NodeList) { } } -// Run starts an asynchronous loop that monitors the status of cluster nodes. -func (nc *NodeController) Run(period time.Duration) { - // Incorporate the results of node status pushed from kubelet to master. - go util.Forever(func() { - if err := nc.monitorNodeStatus(); err != nil { - glog.Errorf("Error monitoring node status: %v", err) - } - }, nc.nodeMonitorPeriod) - - go util.Forever(func() { - nc.podEvictor.TryEvict(func(nodeName string) { nc.deletePods(nodeName) }) - }, nodeEvictionPeriod) +func (nc *NodeController) recordNodeEvent(nodeName string, event string) { + ref := &api.ObjectReference{ + Kind: "Node", + Name: nodeName, + UID: types.UID(nodeName), + Namespace: "", + } + glog.V(2).Infof("Recording %s event message for node %s", event, nodeName) + nc.recorder.Eventf(ref, event, "Node %s event: %s", nodeName, event) } func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status string) { @@ -206,17 +339,6 @@ func (nc *NodeController) recordNodeStatusChange(node *api.Node, new_status stri nc.recorder.Eventf(ref, new_status, "Node %s status is now: %s", node.Name, new_status) } -func (nc *NodeController) recordNodeEvent(nodeName string, event string) { - ref := &api.ObjectReference{ - Kind: "Node", - Name: nodeName, - UID: types.UID(nodeName), - Namespace: "", - } - glog.V(2).Infof("Recording %s event message for node %s", event, nodeName) - nc.recorder.Eventf(ref, event, "Node %s event: %s", nodeName, event) -} - // For a given node checks its conditions and tries to update it. Returns grace period to which given node // is entitled, state of current and last observed Ready Condition, and an error if it ocured. func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, api.NodeCondition, *api.NodeCondition, error) { @@ -348,129 +470,3 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap return gracePeriod, lastReadyCondition, readyCondition, err } - -// monitorNodeStatus verifies node status are constantly updated by kubelet, and if not, -// post "NodeReady==ConditionUnknown". It also evicts all pods if node is not ready or -// not reachable for a long period of time. -func (nc *NodeController) monitorNodeStatus() error { - nodes, err := nc.kubeClient.Nodes().List(labels.Everything(), fields.Everything()) - if err != nil { - return err - } - if nc.allocateNodeCIDRs { - // TODO (cjcullen): Use pkg/controller/framework to watch nodes and - // reduce lists/decouple this from monitoring status. - nc.reconcileNodeCIDRs(nodes) - } - for i := range nodes.Items { - var gracePeriod time.Duration - var lastReadyCondition api.NodeCondition - var readyCondition *api.NodeCondition - node := &nodes.Items[i] - for rep := 0; rep < nodeStatusUpdateRetry; rep++ { - gracePeriod, lastReadyCondition, readyCondition, err = nc.tryUpdateNodeStatus(node) - if err == nil { - break - } - name := node.Name - node, err = nc.kubeClient.Nodes().Get(name) - if err != nil { - glog.Errorf("Failed while getting a Node to retry updating NodeStatus. Probably Node %s was deleted.", name) - break - } - } - if err != nil { - glog.Errorf("Update status of Node %v from NodeController exceeds retry count."+ - "Skipping - no pods will be evicted.", node.Name) - continue - } - - decisionTimestamp := nc.now() - - if readyCondition != nil { - // Check eviction timeout against decisionTimestamp - if lastReadyCondition.Status == api.ConditionFalse && - decisionTimestamp.After(nc.nodeStatusMap[node.Name].readyTransitionTimestamp.Add(nc.podEvictionTimeout)) { - if nc.podEvictor.AddNodeToEvict(node.Name) { - glog.Infof("Adding pods to evict: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout) - } - } - if lastReadyCondition.Status == api.ConditionUnknown && - decisionTimestamp.After(nc.nodeStatusMap[node.Name].probeTimestamp.Add(nc.podEvictionTimeout-gracePeriod)) { - if nc.podEvictor.AddNodeToEvict(node.Name) { - glog.Infof("Adding pods to evict2: %v is later than %v + %v", decisionTimestamp, nc.nodeStatusMap[node.Name].readyTransitionTimestamp, nc.podEvictionTimeout-gracePeriod) - } - } - if lastReadyCondition.Status == api.ConditionTrue { - if nc.podEvictor.RemoveNodeToEvict(node.Name) { - glog.Infof("Pods on %v won't be evicted", node.Name) - } - } - - // Report node event. - if readyCondition.Status != api.ConditionTrue && lastReadyCondition.Status == api.ConditionTrue { - nc.recordNodeStatusChange(node, "NodeNotReady") - } - - // Check with the cloud provider to see if the node still exists. If it - // doesn't, delete the node and all pods scheduled on the node. - if readyCondition.Status != api.ConditionTrue && nc.cloud != nil { - instances, ok := nc.cloud.Instances() - if !ok { - glog.Errorf("%v", ErrCloudInstance) - continue - } - if _, err := instances.ExternalID(node.Name); err != nil && err == cloudprovider.InstanceNotFound { - glog.Infof("Deleting node (no longer present in cloud provider): %s", node.Name) - nc.recordNodeEvent(node.Name, fmt.Sprintf("Deleting Node %v because it's not present according to cloud provider", node.Name)) - if err := nc.kubeClient.Nodes().Delete(node.Name); err != nil { - glog.Errorf("Unable to delete node %s: %v", node.Name, err) - continue - } - if err := nc.deletePods(node.Name); err != nil { - glog.Errorf("Unable to delete pods from node %s: %v", node.Name, err) - } - } - } - } - } - return nil -} - -// deletePods will delete all pods from master running on given node. -func (nc *NodeController) deletePods(nodeID string) error { - glog.V(2).Infof("Delete all pods from %v", nodeID) - pods, err := nc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), - fields.OneTermEqualSelector(client.PodHost, nodeID)) - if err != nil { - return err - } - nc.recordNodeEvent(nodeID, fmt.Sprintf("Deleting all Pods from Node %v.", nodeID)) - for _, pod := range pods.Items { - // Defensive check, also needed for tests. - if pod.Spec.NodeName != nodeID { - continue - } - glog.V(2).Infof("Delete pod %v", pod.Name) - nc.recorder.Eventf(&pod, "NodeControllerEviction", "Deleting Pod %s from Node %s", pod.Name, nodeID) - if err := nc.kubeClient.Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { - glog.Errorf("Error deleting pod %v: %v", pod.Name, err) - } - } - - return nil -} - -// getCondition returns a condition object for the specific condition -// type, nil if the condition is not set. -func (nc *NodeController) getCondition(status *api.NodeStatus, conditionType api.NodeConditionType) *api.NodeCondition { - if status == nil { - return nil - } - for i := range status.Conditions { - if status.Conditions[i].Type == conditionType { - return &status.Conditions[i] - } - } - return nil -} diff --git a/pkg/controller/node/nodecontroller_test.go b/pkg/controller/node/nodecontroller_test.go index 11bfeb263ae..a94cafcbf34 100644 --- a/pkg/controller/node/nodecontroller_test.go +++ b/pkg/controller/node/nodecontroller_test.go @@ -325,7 +325,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) { for _, item := range table { podEvictor := NewPodEvictor(util.NewFakeRateLimiter()) - nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, + nodeController := NewNodeController(nil, item.fakeNodeHandler, evictionTimeout, podEvictor, testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } @@ -531,7 +531,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) { } for _, item := range table { - nodeController := NewNodeController(nil, item.fakeNodeHandler, 10, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()), + nodeController := NewNodeController(nil, item.fakeNodeHandler, 5*time.Minute, NewPodEvictor(util.NewFakeRateLimiter()), testNodeMonitorGracePeriod, testNodeStartupGracePeriod, testNodeMonitorPeriod, nil, false) nodeController.now = func() util.Time { return fakeNow } if err := nodeController.monitorNodeStatus(); err != nil {