Merge pull request #88337 from mgugino-upstream-stage/drain-custom-filters

kubectl/drain add support for custom pod filters
This commit is contained in:
Kubernetes Prow Robot 2020-09-08 04:53:43 -07:00 committed by GitHub
commit cfaa2c4b66
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 178 additions and 88 deletions

View File

@ -65,6 +65,11 @@ type Helper struct {
// won't drain otherwise
SkipWaitForDeleteTimeoutSeconds int
// AdditionalFilters are applied sequentially after base drain filters to
// exclude pods using custom logic. Any filter that returns PodDeleteStatus
// with Delete == false will immediately stop execution of further filters.
AdditionalFilters []PodFilter
Out io.Writer
ErrOut io.Writer
@ -172,7 +177,7 @@ func (d *Helper) EvictPod(pod corev1.Pod, policyGroupVersion string) error {
// or error if it cannot list pods. All pods that are ready to be deleted can be obtained with .Pods(),
// and string with all warning can be obtained with .Warnings(), and .Errors() for all errors that
// occurred during deletion.
func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) {
func (d *Helper) GetPodsForDeletion(nodeName string) (*PodDeleteList, []error) {
labelSelector, err := labels.Parse(d.PodSelector)
if err != nil {
return nil, []error{err}
@ -185,35 +190,37 @@ func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) {
return nil, []error{err}
}
pods := []podDelete{}
list := filterPods(podList, d.makeFilters())
if errs := list.errors(); len(errs) > 0 {
return list, errs
}
return list, nil
}
func filterPods(podList *corev1.PodList, filters []PodFilter) *PodDeleteList {
pods := []PodDelete{}
for _, pod := range podList.Items {
var status podDeleteStatus
for _, filter := range d.makeFilters() {
var status PodDeleteStatus
for _, filter := range filters {
status = filter(pod)
if !status.delete {
if !status.Delete {
// short-circuit as soon as pod is filtered out
// at that point, there is no reason to run pod
// through any additional filters
break
}
}
// Add the pod to podDeleteList no matter what podDeleteStatus is,
// those pods whose podDeleteStatus is false like DaemonSet will
// Add the pod to PodDeleteList no matter what PodDeleteStatus is,
// those pods whose PodDeleteStatus is false like DaemonSet will
// be catched by list.errors()
pods = append(pods, podDelete{
pod: pod,
status: status,
pods = append(pods, PodDelete{
Pod: pod,
Status: status,
})
}
list := &podDeleteList{items: pods}
if errs := list.errors(); len(errs) > 0 {
return list, errs
}
return list, nil
list := &PodDeleteList{items: pods}
return list
}
// DeleteOrEvictPods deletes or evicts the pods on the api server

View File

@ -389,3 +389,72 @@ func TestDeleteOrEvict(t *testing.T) {
})
}
}
func mockFilterSkip(_ corev1.Pod) PodDeleteStatus {
return MakePodDeleteStatusSkip()
}
func mockFilterOkay(_ corev1.Pod) PodDeleteStatus {
return MakePodDeleteStatusOkay()
}
func TestFilterPods(t *testing.T) {
tCases := []struct {
description string
expectedPodListLen int
additionalFilters []PodFilter
}{
{
description: "AdditionalFilter skip all",
expectedPodListLen: 0,
additionalFilters: []PodFilter{
mockFilterSkip,
mockFilterOkay,
},
},
{
description: "AdditionalFilter okay all",
expectedPodListLen: 1,
additionalFilters: []PodFilter{
mockFilterOkay,
},
},
{
description: "AdditionalFilter Skip after Okay all skip",
expectedPodListLen: 0,
additionalFilters: []PodFilter{
mockFilterOkay,
mockFilterSkip,
},
},
{
description: "No additionalFilters okay all",
expectedPodListLen: 1,
},
}
for _, tc := range tCases {
t.Run(tc.description, func(t *testing.T) {
h := &Helper{
Force: true,
AdditionalFilters: tc.additionalFilters,
}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "pod",
Namespace: "default",
},
}
podList := corev1.PodList{
Items: []corev1.Pod{
pod,
},
}
list := filterPods(&podList, h.makeFilters())
podsLen := len(list.Pods())
if podsLen != tc.expectedPodListLen {
t.Errorf("%s: unexpected evictions; actual %v; expected %v", tc.description, podsLen, tc.expectedPodListLen)
}
})
}
}

View File

@ -63,8 +63,8 @@ func TestSkipDeletedFilter(t *testing.T) {
}
podDeleteStatus := h.skipDeletedFilter(pod)
if podDeleteStatus.delete != tc.expectedDelete {
t.Errorf("test %v: unexpected podDeleteStatus.delete; actual %v; expected %v", i, podDeleteStatus.delete, tc.expectedDelete)
if podDeleteStatus.Delete != tc.expectedDelete {
t.Errorf("test %v: unexpected podDeleteStatus.delete; actual %v; expected %v", i, podDeleteStatus.Delete, tc.expectedDelete)
}
}
}

View File

@ -37,30 +37,34 @@ const (
unmanagedWarning = "deleting Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet"
)
type podDelete struct {
pod corev1.Pod
status podDeleteStatus
// PodDelete informs filtering logic whether a pod should be deleted or not
type PodDelete struct {
Pod corev1.Pod
Status PodDeleteStatus
}
type podDeleteList struct {
items []podDelete
// PodDeleteList is a wrapper around []PodDelete
type PodDeleteList struct {
items []PodDelete
}
func (l *podDeleteList) Pods() []corev1.Pod {
// Pods returns a list of all pods marked for deletion after filtering.
func (l *PodDeleteList) Pods() []corev1.Pod {
pods := []corev1.Pod{}
for _, i := range l.items {
if i.status.delete {
pods = append(pods, i.pod)
if i.Status.Delete {
pods = append(pods, i.Pod)
}
}
return pods
}
func (l *podDeleteList) Warnings() string {
// Warnings returns all warning messages concatenated into a string.
func (l *PodDeleteList) Warnings() string {
ps := make(map[string][]string)
for _, i := range l.items {
if i.status.reason == podDeleteStatusTypeWarning {
ps[i.status.message] = append(ps[i.status.message], fmt.Sprintf("%s/%s", i.pod.Namespace, i.pod.Name))
if i.Status.Reason == PodDeleteStatusTypeWarning {
ps[i.Status.Message] = append(ps[i.Status.Message], fmt.Sprintf("%s/%s", i.Pod.Namespace, i.Pod.Name))
}
}
@ -71,15 +75,15 @@ func (l *podDeleteList) Warnings() string {
return strings.Join(msgs, "; ")
}
func (l *podDeleteList) errors() []error {
func (l *PodDeleteList) errors() []error {
failedPods := make(map[string][]string)
for _, i := range l.items {
if i.status.reason == podDeleteStatusTypeError {
msg := i.status.message
if i.Status.Reason == PodDeleteStatusTypeError {
msg := i.Status.Message
if msg == "" {
msg = "unexpected error"
}
failedPods[msg] = append(failedPods[msg], fmt.Sprintf("%s/%s", i.pod.Namespace, i.pod.Name))
failedPods[msg] = append(failedPods[msg], fmt.Sprintf("%s/%s", i.Pod.Namespace, i.Pod.Name))
}
}
errs := make([]error, 0)
@ -89,62 +93,72 @@ func (l *podDeleteList) errors() []error {
return errs
}
type podDeleteStatus struct {
delete bool
reason string
message string
// PodDeleteStatus informs filters if a pod should be deleted
type PodDeleteStatus struct {
Delete bool
Reason string
Message string
}
// Takes a pod and returns a PodDeleteStatus
type podFilter func(corev1.Pod) podDeleteStatus
// PodFilter takes a pod and returns a PodDeleteStatus
type PodFilter func(corev1.Pod) PodDeleteStatus
const (
podDeleteStatusTypeOkay = "Okay"
podDeleteStatusTypeSkip = "Skip"
podDeleteStatusTypeWarning = "Warning"
podDeleteStatusTypeError = "Error"
// PodDeleteStatusTypeOkay is "Okay"
PodDeleteStatusTypeOkay = "Okay"
// PodDeleteStatusTypeSkip is "Skip"
PodDeleteStatusTypeSkip = "Skip"
// PodDeleteStatusTypeWarning is "Warning"
PodDeleteStatusTypeWarning = "Warning"
// PodDeleteStatusTypeError is "Error"
PodDeleteStatusTypeError = "Error"
)
func makePodDeleteStatusOkay() podDeleteStatus {
return podDeleteStatus{
delete: true,
reason: podDeleteStatusTypeOkay,
// MakePodDeleteStatusOkay is a helper method to return the corresponding PodDeleteStatus
func MakePodDeleteStatusOkay() PodDeleteStatus {
return PodDeleteStatus{
Delete: true,
Reason: PodDeleteStatusTypeOkay,
}
}
func makePodDeleteStatusSkip() podDeleteStatus {
return podDeleteStatus{
delete: false,
reason: podDeleteStatusTypeSkip,
// MakePodDeleteStatusSkip is a helper method to return the corresponding PodDeleteStatus
func MakePodDeleteStatusSkip() PodDeleteStatus {
return PodDeleteStatus{
Delete: false,
Reason: PodDeleteStatusTypeSkip,
}
}
func makePodDeleteStatusWithWarning(delete bool, message string) podDeleteStatus {
return podDeleteStatus{
delete: delete,
reason: podDeleteStatusTypeWarning,
message: message,
// MakePodDeleteStatusWithWarning is a helper method to return the corresponding PodDeleteStatus
func MakePodDeleteStatusWithWarning(delete bool, message string) PodDeleteStatus {
return PodDeleteStatus{
Delete: delete,
Reason: PodDeleteStatusTypeWarning,
Message: message,
}
}
func makePodDeleteStatusWithError(message string) podDeleteStatus {
return podDeleteStatus{
delete: false,
reason: podDeleteStatusTypeError,
message: message,
// MakePodDeleteStatusWithError is a helper method to return the corresponding PodDeleteStatus
func MakePodDeleteStatusWithError(message string) PodDeleteStatus {
return PodDeleteStatus{
Delete: false,
Reason: PodDeleteStatusTypeError,
Message: message,
}
}
// The filters are applied in a specific order, only the last filter's
// message will be retained if there are any warnings.
func (d *Helper) makeFilters() []podFilter {
return []podFilter{
func (d *Helper) makeFilters() []PodFilter {
baseFilters := []PodFilter{
d.skipDeletedFilter,
d.daemonSetFilter,
d.mirrorPodFilter,
d.localStorageFilter,
d.unreplicatedFilter,
}
return append(baseFilters, d.AdditionalFilters...)
}
func hasLocalStorage(pod corev1.Pod) bool {
@ -157,7 +171,7 @@ func hasLocalStorage(pod corev1.Pod) bool {
return false
}
func (d *Helper) daemonSetFilter(pod corev1.Pod) podDeleteStatus {
func (d *Helper) daemonSetFilter(pod corev1.Pod) PodDeleteStatus {
// Note that we return false in cases where the pod is DaemonSet managed,
// regardless of flags.
//
@ -166,68 +180,68 @@ func (d *Helper) daemonSetFilter(pod corev1.Pod) podDeleteStatus {
// Such pods will be deleted if --force is used.
controllerRef := metav1.GetControllerOf(&pod)
if controllerRef == nil || controllerRef.Kind != appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind {
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
// Any finished pod can be removed.
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
if _, err := d.Client.AppsV1().DaemonSets(pod.Namespace).Get(context.TODO(), controllerRef.Name, metav1.GetOptions{}); err != nil {
// remove orphaned pods with a warning if --force is used
if apierrors.IsNotFound(err) && d.Force {
return makePodDeleteStatusWithWarning(true, err.Error())
return MakePodDeleteStatusWithWarning(true, err.Error())
}
return makePodDeleteStatusWithError(err.Error())
return MakePodDeleteStatusWithError(err.Error())
}
if !d.IgnoreAllDaemonSets {
return makePodDeleteStatusWithError(daemonSetFatal)
return MakePodDeleteStatusWithError(daemonSetFatal)
}
return makePodDeleteStatusWithWarning(false, daemonSetWarning)
return MakePodDeleteStatusWithWarning(false, daemonSetWarning)
}
func (d *Helper) mirrorPodFilter(pod corev1.Pod) podDeleteStatus {
func (d *Helper) mirrorPodFilter(pod corev1.Pod) PodDeleteStatus {
if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found {
return makePodDeleteStatusSkip()
return MakePodDeleteStatusSkip()
}
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
func (d *Helper) localStorageFilter(pod corev1.Pod) podDeleteStatus {
func (d *Helper) localStorageFilter(pod corev1.Pod) PodDeleteStatus {
if !hasLocalStorage(pod) {
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
// Any finished pod can be removed.
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
if !d.DeleteLocalData {
return makePodDeleteStatusWithError(localStorageFatal)
return MakePodDeleteStatusWithError(localStorageFatal)
}
// TODO: this warning gets dropped by subsequent filters;
// consider accounting for multiple warning conditions or at least
// preserving the last warning message.
return makePodDeleteStatusWithWarning(true, localStorageWarning)
return MakePodDeleteStatusWithWarning(true, localStorageWarning)
}
func (d *Helper) unreplicatedFilter(pod corev1.Pod) podDeleteStatus {
func (d *Helper) unreplicatedFilter(pod corev1.Pod) PodDeleteStatus {
// any finished pod can be removed
if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed {
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
controllerRef := metav1.GetControllerOf(&pod)
if controllerRef != nil {
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}
if d.Force {
return makePodDeleteStatusWithWarning(true, unmanagedWarning)
return MakePodDeleteStatusWithWarning(true, unmanagedWarning)
}
return makePodDeleteStatusWithError(unmanagedFatal)
return MakePodDeleteStatusWithError(unmanagedFatal)
}
func shouldSkipPod(pod corev1.Pod, skipDeletedTimeoutSeconds int) bool {
@ -236,9 +250,9 @@ func shouldSkipPod(pod corev1.Pod, skipDeletedTimeoutSeconds int) bool {
int(time.Now().Sub(pod.ObjectMeta.GetDeletionTimestamp().Time).Seconds()) > skipDeletedTimeoutSeconds
}
func (d *Helper) skipDeletedFilter(pod corev1.Pod) podDeleteStatus {
func (d *Helper) skipDeletedFilter(pod corev1.Pod) PodDeleteStatus {
if shouldSkipPod(pod, d.SkipWaitForDeleteTimeoutSeconds) {
return makePodDeleteStatusSkip()
return MakePodDeleteStatusSkip()
}
return makePodDeleteStatusOkay()
return MakePodDeleteStatusOkay()
}