adding pods to DeletePods parameters

This commit is contained in:
Krzysztof Siedlecki 2019-09-30 13:55:19 +02:00
parent c5440829d5
commit 8f48896709
4 changed files with 44 additions and 19 deletions

View File

@ -9,6 +9,7 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle", importpath = "k8s.io/kubernetes/pkg/controller/nodelifecycle",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [
"//pkg/apis/core:go_default_library",
"//pkg/controller:go_default_library", "//pkg/controller:go_default_library",
"//pkg/controller/nodelifecycle/scheduler:go_default_library", "//pkg/controller/nodelifecycle/scheduler:go_default_library",
"//pkg/controller/util/node:go_default_library", "//pkg/controller/util/node:go_default_library",
@ -23,6 +24,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/equality:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",

View File

@ -34,6 +34,7 @@ import (
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
@ -51,6 +52,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/client-go/util/flowcontrol" "k8s.io/client-go/util/flowcontrol"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
apicore "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler" "k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler"
nodeutil "k8s.io/kubernetes/pkg/controller/util/node" nodeutil "k8s.io/kubernetes/pkg/controller/util/node"
@ -673,7 +675,12 @@ func (nc *Controller) doEvictionPass() {
klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err) klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
} }
nodeUID, _ := value.UID.(string) nodeUID, _ := value.UID.(string)
remaining, err := nodeutil.DeletePods(nc.kubeClient, nc.recorder, value.Value, nodeUID, nc.daemonSetStore) pods, err := listPodsFromNode(nc.kubeClient, value.Value)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err))
return false, 0
}
remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
if err != nil { if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err)) utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
return false, 0 return false, 0
@ -693,6 +700,16 @@ func (nc *Controller) doEvictionPass() {
} }
} }
func listPodsFromNode(kubeClient clientset.Interface, nodeName string) ([]v1.Pod, error) {
selector := fields.OneTermEqualSelector(apicore.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options)
if err != nil {
return nil, err
}
return pods.Items, nil
}
// monitorNodeHealth verifies node health are constantly updated by kubelet, and // monitorNodeHealth verifies node health are constantly updated by kubelet, and
// if not, post "NodeReady==ConditionUnknown". // if not, post "NodeReady==ConditionUnknown".
// For nodes who are not ready or not reachable for a long period of time. // For nodes who are not ready or not reachable for a long period of time.

View File

@ -91,7 +91,11 @@ func (nc *nodeLifecycleController) doEviction(fakeNodeHandler *testutil.FakeNode
for _, zone := range zones { for _, zone := range zones {
nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nc.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
uid, _ := value.UID.(string) uid, _ := value.UID.(string)
nodeutil.DeletePods(fakeNodeHandler, nc.recorder, value.Value, uid, nc.daemonSetStore) pods, err := listPodsFromNode(fakeNodeHandler, value.Value)
if err != nil {
return false, 0
}
nodeutil.DeletePods(fakeNodeHandler, pods, nc.recorder, value.Value, uid, nc.daemonSetStore)
return true, 0 return true, 0
}) })
} }
@ -693,7 +697,11 @@ func TestMonitorNodeHealthEvictPods(t *testing.T) {
if _, ok := nodeController.zonePodEvictor[zone]; ok { if _, ok := nodeController.zonePodEvictor[zone]; ok {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUID, _ := value.UID.(string) nodeUID, _ := value.UID.(string)
nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister()) pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeutil.DeletePods(item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetInformer.Lister())
return true, 0 return true, 0
}) })
} else { } else {
@ -846,7 +854,11 @@ func TestPodStatusChange(t *testing.T) {
for _, zone := range zones { for _, zone := range zones {
nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) { nodeController.zonePodEvictor[zone].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
nodeUID, _ := value.UID.(string) nodeUID, _ := value.UID.(string)
nodeutil.DeletePods(item.fakeNodeHandler, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore) pods, err := listPodsFromNode(item.fakeNodeHandler, value.Value)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
nodeutil.DeletePods(item.fakeNodeHandler, pods, nodeController.recorder, value.Value, nodeUID, nodeController.daemonSetStore)
return true, 0 return true, 0
}) })
} }

View File

@ -44,32 +44,26 @@ import (
// DeletePods will delete all pods from master running on given node, // DeletePods will delete all pods from master running on given node,
// and return true if any pods were deleted, or were found pending // and return true if any pods were deleted, or were found pending
// deletion. // deletion.
func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) { func DeletePods(kubeClient clientset.Interface, pods []v1.Pod, recorder record.EventRecorder, nodeName, nodeUID string, daemonStore appsv1listers.DaemonSetLister) (bool, error) {
remaining := false remaining := false
selector := fields.OneTermEqualSelector(api.PodHostField, nodeName).String()
options := metav1.ListOptions{FieldSelector: selector}
pods, err := kubeClient.CoreV1().Pods(metav1.NamespaceAll).List(options)
var updateErrList []error var updateErrList []error
if err != nil { if len(pods) > 0 {
return remaining, err
}
if len(pods.Items) > 0 {
RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName)) RecordNodeEvent(recorder, nodeName, nodeUID, v1.EventTypeNormal, "DeletingAllPods", fmt.Sprintf("Deleting all Pods from Node %v.", nodeName))
} }
for _, pod := range pods.Items { for i := range pods {
pod := &pods[i]
// Defensive check, also needed for tests. // Defensive check, also needed for tests.
if pod.Spec.NodeName != nodeName { if pod.Spec.NodeName != nodeName {
continue continue
} }
// Set reason and message in the pod object. // Set reason and message in the pod object.
if _, err = SetPodTerminationReason(kubeClient, &pod, nodeName); err != nil { if _, err := SetPodTerminationReason(kubeClient, pod, nodeName); err != nil {
if apierrors.IsConflict(err) { if apierrors.IsConflict(err) {
updateErrList = append(updateErrList, updateErrList = append(updateErrList,
fmt.Errorf("update status failed for pod %q: %v", format.Pod(&pod), err)) fmt.Errorf("update status failed for pod %q: %v", format.Pod(pod), err))
continue continue
} }
} }
@ -79,13 +73,13 @@ func DeletePods(kubeClient clientset.Interface, recorder record.EventRecorder, n
continue continue
} }
// if the pod is managed by a daemonset, ignore it // if the pod is managed by a daemonset, ignore it
_, err := daemonStore.GetPodDaemonSets(&pod) if _, err := daemonStore.GetPodDaemonSets(pod); err == nil {
if err == nil { // No error means at least one daemonset was found // No error means at least one daemonset was found
continue continue
} }
klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name) klog.V(2).Infof("Starting deletion of pod %v/%v", pod.Namespace, pod.Name)
recorder.Eventf(&pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName) recorder.Eventf(pod, v1.EventTypeNormal, "NodeControllerEviction", "Marking for deletion Pod %s from Node %s", pod.Name, nodeName)
if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil { if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
return false, err return false, err
} }