diff --git a/pkg/controller/nodelifecycle/BUILD b/pkg/controller/nodelifecycle/BUILD index b0ac82c6737..c4a9a16af21 100644 --- a/pkg/controller/nodelifecycle/BUILD +++ b/pkg/controller/nodelifecycle/BUILD @@ -9,6 +9,7 @@ go_library( importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", visibility = ["//visibility:public"], deps = [ + "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", "//pkg/controller/nodelifecycle/scheduler:go_default_library", "//pkg/controller/util/node:go_default_library", @@ -23,6 +24,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller.go b/pkg/controller/nodelifecycle/node_lifecycle_controller.go index 4441b3de27c..c81e859951a 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller.go @@ -34,6 +34,7 @@ import ( apiequality "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -51,6 +52,7 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/workqueue" + apicore "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" nodeutil "k8s.io/kubernetes/pkg/controller/util/node" @@ -673,7 +675,12 @@ func (nc *Controller) doEvictionPass() { klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) } nodeUID, _ := value.UID.(string) - remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) + pods, err := listPodsFromNode(nc.kubeClient, value.Value) + if err != nil { + utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err)) + return false, 0 + } + remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) return false, 0 @@ -693,6 +700,16 @@ func (nc *Controller) doEvictionPass() { } } +func listPodsFromNode(kubeClient clientset.Interface, nodeName string) ([]v1.Pod, error) { + selector := fields.OneTermEqualSelector(apicore.PodHostField, nodeName).String() + options := metav1.ListOptions{FieldSelector: selector} + pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options) + if err != nil { + return nil, err + } + return pods.Items, nil +} + // monitorNodeHealth verifies node health are constantly updated by kubelet, and // if not, post "NodeReady==ConditionUnknown". // For nodes who are not ready or not reachable for a long period of time. @@ -773,7 +790,12 @@ func (nc *Controller) monitorNodeHealth() error { // Report node event. if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue { nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady") - if err = nodeutil.MarkAllPodsNotReady(nc.kubeClient, node); err != nil { + pods, err := listPodsFromNode(nc.kubeClient, node.Name) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Unable to list pods from node %v: %v", node.Name, err)) + continue + } + if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil { utilruntime.HandleError(fmt.Errorf("Unable to mark all pods NotReady on node %v: %v", node.Name, err)) } } diff --git a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go index 8165c3b4060..1360d9cc8d5 100644 --- a/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go +++ b/pkg/controller/nodelifecycle/node_lifecycle_controller_test.go @@ -91,7 +91,11 @@ func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNode for _, zone := range zones { nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { uid, _ := value.UID.(string) - nodeutil.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) + pods, err := listPodsFromNode(fakeNodeHandler, value.Value) + if err != nil { + return false, 0 + } + nodeutil.DeletePods(fakeNodeHandler, pods, nc.recorder, value.Value, uid, nc.daemonSetStore) return true, 0 }) } @@ -693,7 +697,11 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) { if _, ok := nodeController.zonePodEvictor[zone]; ok { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) + pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + nodeutil.DeletePods(item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) return true, 0 }) } else { @@ -846,7 +854,11 @@ func TestPodStatusChange(t *testing.T) { for _, zone := range zones { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeUID, _ := value.UID.(string) - nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) + pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + nodeutil.DeletePods(item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) return true, 0 }) } diff --git a/pkg/controller/util/node/BUILD b/pkg/controller/util/node/BUILD index c875ccf674c..fbb36ecca73 100644 --- a/pkg/controller/util/node/BUILD +++ b/pkg/controller/util/node/BUILD @@ -7,14 +7,12 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/api/v1/pod:go_default_library", - "//pkg/apis/core:go_default_library", "//pkg/controller:go_default_library", "//pkg/kubelet/util/format:go_default_library", "//pkg/util/node:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", diff --git a/pkg/controller/util/node/controller_utils.go b/pkg/controller/util/node/controller_utils.go index da17b5696b5..e804a4f8872 100644 --- a/pkg/controller/util/node/controller_utils.go +++ b/pkg/controller/util/node/controller_utils.go @@ -22,7 +22,6 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" @@ -33,7 +32,6 @@ import ( clientset "k8s.io/client-go/kubernetes" appsv1listers "k8s.io/client-go/listers/apps/v1" utilpod "k8s.io/kubernetes/pkg/api/v1/pod" - api "k8s.io/kubernetes/pkg/apis/core" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/kubelet/util/format" nodepkg "k8s.io/kubernetes/pkg/util/node" @@ -44,32 +42,26 @@ import ( // DeletePods will delete all pods from master running on given node, // and return true if any pods were deleted, or were found pending // deletion. -func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) { +func DeletePods(kubeClient clientset.Interface, pods []v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) { remaining := false - selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String() - options := metav1.ListOptions{FieldSelector: selector} - pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options) var updateErrList []error - if err != nil { - return remaining, err - } - - if len(pods.Items) > 0 { + if len(pods) > 0 { RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) } - for _, pod := range pods.Items { + for i := range pods { + pod := &pods[i] // Defensive check, also needed for tests. if pod.Spec.NodeName != nodeName { continue } // Set reason and message in the pod object. - if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil { + if _, err := SetPodTerminationReason(kubeClient, pod, nodeName); err != nil { if apierrors.IsConflict(err) { updateErrList = append(updateErrList, - fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err)) + fmt.Errorf("update status failed for pod %q: %v", format.Pod(pod), err)) continue } } @@ -79,13 +71,13 @@ func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n continue } // if the pod is managed by a daemonset, ignore it - _, err := daemonStore.GetPodDaemonSets(&pod) - if err == nil { // No error means at least one daemonset was found + if _, err := daemonStore.GetPodDaemonSets(pod); err == nil { + // No error means at least one daemonset was found continue } klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name) - recorder.Eventf(&pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) + recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { return false, err } @@ -117,19 +109,13 @@ func SetPodTerminationReason(kubeClient clientset.Interface, pod *v1.Pod, nodeNa return updatedPod, nil } -// MarkAllPodsNotReady updates ready status of all pods running on +// MarkPodsNotReady updates ready status of given pods running on // given node from master return true if success -func MarkAllPodsNotReady(kubeClient clientset.Interface, node *v1.Node) error { - nodeName := node.Name +func MarkPodsNotReady(kubeClient clientset.Interface, pods []v1.Pod, nodeName string) error { klog.V(2).Infof("Update ready status of pods on node [%v]", nodeName) - opts := metav1.ListOptions{FieldSelector: fields.OneTermEqualSelector(api.PodHostField, nodeName).String()} - pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(opts) - if err != nil { - return err - } errMsg := []string{} - for _, pod := range pods.Items { + for _, pod := range pods { // Defensive check, also needed for tests. if pod.Spec.NodeName != nodeName { continue