diff --git a/pkg/kubectl/BUILD b/pkg/kubectl/BUILD index 1d56ac8a7d4..e86c59c4596 100644 --- a/pkg/kubectl/BUILD +++ b/pkg/kubectl/BUILD @@ -107,6 +107,7 @@ filegroup( "//pkg/kubectl/apps:all-srcs", "//pkg/kubectl/cmd:all-srcs", "//pkg/kubectl/describe:all-srcs", + "//pkg/kubectl/drain:all-srcs", "//pkg/kubectl/explain:all-srcs", "//pkg/kubectl/generate:all-srcs", "//pkg/kubectl/generated:all-srcs", diff --git a/pkg/kubectl/cmd/drain/BUILD b/pkg/kubectl/cmd/drain/BUILD index b340ba7ef7e..2f8598e5f4f 100644 --- a/pkg/kubectl/cmd/drain/BUILD +++ b/pkg/kubectl/cmd/drain/BUILD @@ -7,27 +7,21 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/kubectl/cmd/util:go_default_library", + "//pkg/kubectl/drain:go_default_library", "//pkg/kubectl/scheme:go_default_library", "//pkg/kubectl/util/i18n:go_default_library", "//pkg/kubectl/util/templates:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/printers:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions/resource:go_default_library", - "//staging/src/k8s.io/client-go/kubernetes:go_default_library", - "//staging/src/k8s.io/client-go/rest:go_default_library", "//vendor/github.com/spf13/cobra:go_default_library", ], ) @@ -39,6 +33,7 @@ go_test( deps = [ "//pkg/kubectl/cmd/testing:go_default_library", "//pkg/kubectl/cmd/util:go_default_library", + "//pkg/kubectl/drain:go_default_library", "//pkg/kubectl/scheme:go_default_library", "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/batch/v1:go_default_library", diff --git a/pkg/kubectl/cmd/drain/drain.go b/pkg/kubectl/cmd/drain/drain.go index 487b12af161..8e98e84ee6f 100644 --- a/pkg/kubectl/cmd/drain/drain.go +++ b/pkg/kubectl/cmd/drain/drain.go @@ -20,78 +20,41 @@ import ( "errors" "fmt" "math" - "strings" "time" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" - policyv1beta1 "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/json" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - restclient "k8s.io/client-go/rest" - "k8s.io/apimachinery/pkg/runtime/schema" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/genericclioptions/printers" "k8s.io/cli-runtime/pkg/genericclioptions/resource" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/kubectl/drain" "k8s.io/kubernetes/pkg/kubectl/scheme" "k8s.io/kubernetes/pkg/kubectl/util/i18n" "k8s.io/kubernetes/pkg/kubectl/util/templates" ) -type DrainOptions struct { +type DrainCmdOptions struct { PrintFlags *genericclioptions.PrintFlags ToPrinter func(string) (printers.ResourcePrinterFunc, error) - Namespace string - client kubernetes.Interface - restClient *restclient.RESTClient - Force bool - DryRun bool - GracePeriodSeconds int - IgnoreDaemonsets bool - Timeout time.Duration - DeleteLocalData bool - Selector string - PodSelector string - nodeInfos []*resource.Info + Namespace string + + drainer *drain.Helper + nodeInfos []*resource.Info genericclioptions.IOStreams } -// Takes a pod and returns a bool indicating whether or not to operate on the -// pod, an optional warning message, and an optional fatal error. -type podFilter func(corev1.Pod) (include bool, w *warning, f *fatal) -type warning struct { - string -} -type fatal struct { - string -} - -const ( - EvictionKind = "Eviction" - EvictionSubresource = "pods/eviction" - - daemonsetFatal = "DaemonSet-managed Pods (use --ignore-daemonsets to ignore)" - daemonsetWarning = "ignoring DaemonSet-managed Pods" - localStorageFatal = "Pods with local storage (use --delete-local-data to override)" - localStorageWarning = "deleting Pods with local storage" - unmanagedFatal = "Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)" - unmanagedWarning = "deleting Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" -) - var ( cordonLong = templates.LongDesc(i18n.T(` Mark node as unschedulable.`)) @@ -102,7 +65,7 @@ var ( ) func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command { - o := NewDrainOptions(f, ioStreams) + o := NewDrainCmdOptions(f, ioStreams) cmd := &cobra.Command{ Use: "cordon NODE", @@ -115,7 +78,7 @@ func NewCmdCordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cob cmdutil.CheckErr(o.RunCordonOrUncordon(true)) }, } - cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on") + cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on") cmdutil.AddDryRunFlag(cmd) return cmd } @@ -130,7 +93,7 @@ var ( ) func NewCmdUncordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command { - o := NewDrainOptions(f, ioStreams) + o := NewDrainCmdOptions(f, ioStreams) cmd := &cobra.Command{ Use: "uncordon NODE", @@ -143,7 +106,7 @@ func NewCmdUncordon(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *c cmdutil.CheckErr(o.RunCordonOrUncordon(false)) }, } - cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on") + cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on") cmdutil.AddDryRunFlag(cmd) return cmd } @@ -182,17 +145,19 @@ var ( $ kubectl drain foo --grace-period=900`)) ) -func NewDrainOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainOptions { - return &DrainOptions{ +func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions { + return &DrainCmdOptions{ PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme), - - IOStreams: ioStreams, - GracePeriodSeconds: -1, + IOStreams: ioStreams, + drainer: &drain.Helper{ + GracePeriodSeconds: -1, + ErrOut: ioStreams.ErrOut, + }, } } func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command { - o := NewDrainOptions(f, ioStreams) + o := NewDrainCmdOptions(f, ioStreams) cmd := &cobra.Command{ Use: "drain NODE", @@ -205,13 +170,13 @@ func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobr cmdutil.CheckErr(o.RunDrain()) }, } - cmd.Flags().BoolVar(&o.Force, "force", o.Force, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet.") - cmd.Flags().BoolVar(&o.IgnoreDaemonsets, "ignore-daemonsets", o.IgnoreDaemonsets, "Ignore DaemonSet-managed pods.") - cmd.Flags().BoolVar(&o.DeleteLocalData, "delete-local-data", o.DeleteLocalData, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).") - cmd.Flags().IntVar(&o.GracePeriodSeconds, "grace-period", o.GracePeriodSeconds, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.") - cmd.Flags().DurationVar(&o.Timeout, "timeout", o.Timeout, "The length of time to wait before giving up, zero means infinite") - cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on") - cmd.Flags().StringVarP(&o.PodSelector, "pod-selector", "", o.PodSelector, "Label selector to filter pods on the node") + cmd.Flags().BoolVar(&o.drainer.Force, "force", o.drainer.Force, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet.") + cmd.Flags().BoolVar(&o.drainer.IgnoreAllDaemonSets, "ignore-daemonsets", o.drainer.IgnoreAllDaemonSets, "Ignore DaemonSet-managed pods.") + cmd.Flags().BoolVar(&o.drainer.DeleteLocalData, "delete-local-data", o.drainer.DeleteLocalData, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).") + cmd.Flags().IntVar(&o.drainer.GracePeriodSeconds, "grace-period", o.drainer.GracePeriodSeconds, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.") + cmd.Flags().DurationVar(&o.drainer.Timeout, "timeout", o.drainer.Timeout, "The length of time to wait before giving up, zero means infinite") + cmd.Flags().StringVarP(&o.drainer.Selector, "selector", "l", o.drainer.Selector, "Selector (label query) to filter on") + cmd.Flags().StringVarP(&o.drainer.PodSelector, "pod-selector", "", o.drainer.PodSelector, "Label selector to filter pods on the node") cmdutil.AddDryRunFlag(cmd) return cmd @@ -219,33 +184,28 @@ func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobr // Complete populates some fields from the factory, grabs command line // arguments and looks up the node using Builder -func (o *DrainOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error { +func (o *DrainCmdOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []string) error { var err error if len(args) == 0 && !cmd.Flags().Changed("selector") { return cmdutil.UsageErrorf(cmd, fmt.Sprintf("USAGE: %s [flags]", cmd.Use)) } - if len(args) > 0 && len(o.Selector) > 0 { + if len(args) > 0 && len(o.drainer.Selector) > 0 { return cmdutil.UsageErrorf(cmd, "error: cannot specify both a node name and a --selector option") } - o.DryRun = cmdutil.GetDryRunFlag(cmd) + o.drainer.DryRun = cmdutil.GetDryRunFlag(cmd) - if o.client, err = f.KubernetesClientSet(); err != nil { + if o.drainer.Client, err = f.KubernetesClientSet(); err != nil { return err } - if len(o.PodSelector) > 0 { - if _, err := labels.Parse(o.PodSelector); err != nil { + if len(o.drainer.PodSelector) > 0 { + if _, err := labels.Parse(o.drainer.PodSelector); err != nil { return errors.New("--pod-selector= must be a valid label selector") } } - o.restClient, err = f.RESTClient() - if err != nil { - return err - } - o.nodeInfos = []*resource.Info{} o.Namespace, _, err = f.ToRawKubeConfigLoader().Namespace() @@ -255,7 +215,7 @@ func (o *DrainOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []st o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) { o.PrintFlags.NamePrintFlags.Operation = operation - if o.DryRun { + if o.drainer.DryRun { o.PrintFlags.Complete("%s (dry run)") } @@ -274,8 +234,8 @@ func (o *DrainOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []st SingleResourceType(). Flatten() - if len(o.Selector) > 0 { - builder = builder.LabelSelectorParam(o.Selector). + if len(o.drainer.Selector) > 0 { + builder = builder.LabelSelectorParam(o.drainer.Selector). ResourceTypes("nodes") } @@ -299,7 +259,7 @@ func (o *DrainOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []st } // RunDrain runs the 'drain' command -func (o *DrainOptions) RunDrain() error { +func (o *DrainCmdOptions) RunDrain() error { if err := o.RunCordonOrUncordon(true); err != nil { return err } @@ -314,10 +274,10 @@ func (o *DrainOptions) RunDrain() error { for _, info := range o.nodeInfos { var err error - if !o.DryRun { + if !o.drainer.DryRun { err = o.deleteOrEvictPodsSimple(info) } - if err == nil || o.DryRun { + if err == nil || o.drainer.DryRun { drainedNodes.Insert(info.Name) printObj(info.Object, o.Out) } else { @@ -344,218 +304,43 @@ func (o *DrainOptions) RunDrain() error { return fatal } -func (o *DrainOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error { - pods, err := o.getPodsForDeletion(nodeInfo) - if err != nil { - return err +func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error { + list, errs := o.drainer.GetPodsForDeletion(nodeInfo.Name) + if errs != nil { + return utilerrors.NewAggregate(errs) + } + if warnings := list.Warnings(); warnings != "" { + fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings) } - err = o.deleteOrEvictPods(pods) - if err != nil { - pendingPods, newErr := o.getPodsForDeletion(nodeInfo) - if newErr != nil { - return newErr - } + if err := o.deleteOrEvictPods(list.Pods()); err != nil { + pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name) + fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err) - for _, pendingPod := range pendingPods { + for _, pendingPod := range pendingList.Pods() { fmt.Fprintf(o.ErrOut, "%s/%s\n", "pod", pendingPod.Name) } - } - return err -} - -func (o *DrainOptions) getPodController(pod corev1.Pod) *metav1.OwnerReference { - return metav1.GetControllerOf(&pod) -} - -func (o *DrainOptions) unreplicatedFilter(pod corev1.Pod) (bool, *warning, *fatal) { - // any finished pod can be removed - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - return true, nil, nil - } - - controllerRef := o.getPodController(pod) - if controllerRef != nil { - return true, nil, nil - } - if o.Force { - return true, &warning{unmanagedWarning}, nil - } - - return false, nil, &fatal{unmanagedFatal} -} - -func (o *DrainOptions) daemonsetFilter(pod corev1.Pod) (bool, *warning, *fatal) { - // Note that we return false in cases where the pod is DaemonSet managed, - // regardless of flags. - // - // The exception is for pods that are orphaned (the referencing - // management resource - including DaemonSet - is not found). - // Such pods will be deleted if --force is used. - controllerRef := o.getPodController(pod) - if controllerRef == nil || controllerRef.Kind != "DaemonSet" { - return true, nil, nil - } - // Any finished pod can be removed. - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - return true, nil, nil - } - - if _, err := o.client.AppsV1().DaemonSets(pod.Namespace).Get(controllerRef.Name, metav1.GetOptions{}); err != nil { - // remove orphaned pods with a warning if --force is used - if apierrors.IsNotFound(err) && o.Force { - return true, &warning{err.Error()}, nil + if newErrs != nil { + fmt.Fprintf(o.ErrOut, "following errors also occurred:\n%s", utilerrors.NewAggregate(newErrs)) } - - return false, nil, &fatal{err.Error()} + return err } - - if !o.IgnoreDaemonsets { - return false, nil, &fatal{daemonsetFatal} - } - - return false, &warning{daemonsetWarning}, nil -} - -func mirrorPodFilter(pod corev1.Pod) (bool, *warning, *fatal) { - if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { - return false, nil, nil - } - return true, nil, nil -} - -func hasLocalStorage(pod corev1.Pod) bool { - for _, volume := range pod.Spec.Volumes { - if volume.EmptyDir != nil { - return true - } - } - - return false -} - -func (o *DrainOptions) localStorageFilter(pod corev1.Pod) (bool, *warning, *fatal) { - if !hasLocalStorage(pod) { - return true, nil, nil - } - // Any finished pod can be removed. - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - return true, nil, nil - } - if !o.DeleteLocalData { - return false, nil, &fatal{localStorageFatal} - } - - return true, &warning{localStorageWarning}, nil -} - -// Map of status message to a list of pod names having that status. -type podStatuses map[string][]string - -func (ps podStatuses) Message() string { - msgs := []string{} - - for key, pods := range ps { - msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) - } - return strings.Join(msgs, "; ") -} - -// getPodsForDeletion receives resource info for a node, and returns all the pods from the given node that we -// are planning on deleting. If there are any pods preventing us from deleting, we return that list in an error. -func (o *DrainOptions) getPodsForDeletion(nodeInfo *resource.Info) (pods []corev1.Pod, err error) { - labelSelector, err := labels.Parse(o.PodSelector) - if err != nil { - return pods, err - } - - podList, err := o.client.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{ - LabelSelector: labelSelector.String(), - FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeInfo.Name}).String()}) - if err != nil { - return pods, err - } - - ws := podStatuses{} - fs := podStatuses{} - - for _, pod := range podList.Items { - podOk := true - for _, filt := range []podFilter{o.daemonsetFilter, mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter} { - filterOk, w, f := filt(pod) - - podOk = podOk && filterOk - if w != nil { - ws[w.string] = append(ws[w.string], pod.Name) - } - if f != nil { - fs[f.string] = append(fs[f.string], pod.Name) - } - - // short-circuit as soon as pod not ok - // at that point, there is no reason to run pod - // through any additional filters - if !podOk { - break - } - } - if podOk { - pods = append(pods, pod) - } - } - - if len(fs) > 0 { - return []corev1.Pod{}, errors.New(fs.Message()) - } - if len(ws) > 0 { - fmt.Fprintf(o.ErrOut, "WARNING: %s\n", ws.Message()) - } - return pods, nil -} - -func (o *DrainOptions) deletePod(pod corev1.Pod) error { - deleteOptions := &metav1.DeleteOptions{} - if o.GracePeriodSeconds >= 0 { - gracePeriodSeconds := int64(o.GracePeriodSeconds) - deleteOptions.GracePeriodSeconds = &gracePeriodSeconds - } - return o.client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, deleteOptions) -} - -func (o *DrainOptions) evictPod(pod corev1.Pod, policyGroupVersion string) error { - deleteOptions := &metav1.DeleteOptions{} - if o.GracePeriodSeconds >= 0 { - gracePeriodSeconds := int64(o.GracePeriodSeconds) - deleteOptions.GracePeriodSeconds = &gracePeriodSeconds - } - eviction := &policyv1beta1.Eviction{ - TypeMeta: metav1.TypeMeta{ - APIVersion: policyGroupVersion, - Kind: EvictionKind, - }, - ObjectMeta: metav1.ObjectMeta{ - Name: pod.Name, - Namespace: pod.Namespace, - }, - DeleteOptions: deleteOptions, - } - // Remember to change change the URL manipulation func when Evction's version change - return o.client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction) + return nil } // deleteOrEvictPods deletes or evicts the pods on the api server -func (o *DrainOptions) deleteOrEvictPods(pods []corev1.Pod) error { +func (o *DrainCmdOptions) deleteOrEvictPods(pods []corev1.Pod) error { if len(pods) == 0 { return nil } - policyGroupVersion, err := SupportEviction(o.client) + policyGroupVersion, err := drain.CheckEvictionSupport(o.drainer.Client) if err != nil { return err } getPodFn := func(namespace, name string) (*corev1.Pod, error) { - return o.client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + return o.drainer.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) } if len(policyGroupVersion) > 0 { @@ -565,14 +350,14 @@ func (o *DrainOptions) deleteOrEvictPods(pods []corev1.Pod) error { } } -func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { +func (o *DrainCmdOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { returnCh := make(chan error, 1) for _, pod := range pods { go func(pod corev1.Pod, returnCh chan error) { - var err error for { - err = o.evictPod(pod, policyGroupVersion) + fmt.Fprintf(o.Out, "evicting pod %q\n", pod.Name) + err := o.drainer.EvictPod(pod, policyGroupVersion) if err == nil { break } else if apierrors.IsNotFound(err) { @@ -586,8 +371,7 @@ func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, g return } } - podArray := []corev1.Pod{pod} - _, err = o.waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn) + _, err := o.waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn) if err == nil { returnCh <- nil } else { @@ -601,10 +385,10 @@ func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, g // 0 timeout means infinite, we use MaxInt64 to represent it. var globalTimeout time.Duration - if o.Timeout == 0 { + if o.drainer.Timeout == 0 { globalTimeout = time.Duration(math.MaxInt64) } else { - globalTimeout = o.Timeout + globalTimeout = o.drainer.Timeout } globalTimeoutCh := time.After(globalTimeout) numPods := len(pods) @@ -616,22 +400,22 @@ func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, g errors = append(errors, err) } case <-globalTimeoutCh: - return fmt.Errorf("Drain did not complete within %v", globalTimeout) + return fmt.Errorf("drain did not complete within %v", globalTimeout) } } return utilerrors.NewAggregate(errors) } -func (o *DrainOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { +func (o *DrainCmdOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { // 0 timeout means infinite, we use MaxInt64 to represent it. var globalTimeout time.Duration - if o.Timeout == 0 { + if o.drainer.Timeout == 0 { globalTimeout = time.Duration(math.MaxInt64) } else { - globalTimeout = o.Timeout + globalTimeout = o.drainer.Timeout } for _, pod := range pods { - err := o.deletePod(pod) + err := o.drainer.DeletePod(pod) if err != nil && !apierrors.IsNotFound(err) { return err } @@ -640,7 +424,7 @@ func (o *DrainOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, na return err } -func (o *DrainOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) { +func (o *DrainCmdOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) { var verbStr string if usingEviction { verbStr = "evicted" @@ -674,65 +458,29 @@ func (o *DrainOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.D return pods, err } -// SupportEviction uses Discovery API to find out if the server support eviction subresource -// If support, it will return its groupVersion; Otherwise, it will return "" -func SupportEviction(clientset kubernetes.Interface) (string, error) { - discoveryClient := clientset.Discovery() - groupList, err := discoveryClient.ServerGroups() - if err != nil { - return "", err - } - foundPolicyGroup := false - var policyGroupVersion string - for _, group := range groupList.Groups { - if group.Name == "policy" { - foundPolicyGroup = true - policyGroupVersion = group.PreferredVersion.GroupVersion - break - } - } - if !foundPolicyGroup { - return "", nil - } - resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") - if err != nil { - return "", err - } - for _, resource := range resourceList.APIResources { - if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { - return policyGroupVersion, nil - } - } - return "", nil -} - // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for // "Unschedulable" is passed as the first arg. -func (o *DrainOptions) RunCordonOrUncordon(desired bool) error { +func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error { cordonOrUncordon := "cordon" if !desired { cordonOrUncordon = "un" + cordonOrUncordon } for _, nodeInfo := range o.nodeInfos { - if nodeInfo.Mapping.GroupVersionKind.Kind == "Node" { - obj, err := scheme.Scheme.ConvertToVersion(nodeInfo.Object, nodeInfo.Mapping.GroupVersionKind.GroupVersion()) + + printError := func(err error) { + fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err) + } + + gvk := nodeInfo.ResourceMapping().GroupVersionKind + if gvk.Kind == "Node" { + c, err := drain.NewCordonHelperFromRuntimeObject(nodeInfo.Object, scheme.Scheme, gvk) if err != nil { - fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err) + printError(err) continue } - oldData, err := json.Marshal(obj) - if err != nil { - fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err) - continue - } - node, ok := obj.(*corev1.Node) - if !ok { - fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: unexpected Type%T, expected Node\n", cordonOrUncordon, nodeInfo.Name, obj) - continue - } - unsched := node.Spec.Unschedulable - if unsched == desired { + + if updateRequired := c.UpdateIfRequired(desired); !updateRequired { printObj, err := o.ToPrinter(already(desired)) if err != nil { fmt.Fprintf(o.ErrOut, "error: %v\n", err) @@ -740,22 +488,13 @@ func (o *DrainOptions) RunCordonOrUncordon(desired bool) error { } printObj(nodeInfo.Object, o.Out) } else { - if !o.DryRun { - helper := resource.NewHelper(o.restClient, nodeInfo.Mapping) - node.Spec.Unschedulable = desired - newData, err := json.Marshal(obj) - if err != nil { - fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err) - continue + if !o.drainer.DryRun { + err, patchErr := c.PatchOrReplace(o.drainer.Client) + if patchErr != nil { + printError(patchErr) } - patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, obj) if err != nil { - fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err) - continue - } - _, err = helper.Patch(o.Namespace, nodeInfo.Name, types.StrategicMergePatchType, patchBytes, nil) - if err != nil { - fmt.Fprintf(o.ErrOut, "error: unable to %s node %q: %v\n", cordonOrUncordon, nodeInfo.Name, err) + printError(err) continue } } diff --git a/pkg/kubectl/cmd/drain/drain_test.go b/pkg/kubectl/cmd/drain/drain_test.go index d0c3eb66233..3da3097505f 100644 --- a/pkg/kubectl/cmd/drain/drain_test.go +++ b/pkg/kubectl/cmd/drain/drain_test.go @@ -27,6 +27,7 @@ import ( "reflect" "strconv" "strings" + "sync/atomic" "testing" "time" @@ -48,6 +49,7 @@ import ( "k8s.io/client-go/rest/fake" cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" + "k8s.io/kubernetes/pkg/kubectl/drain" "k8s.io/kubernetes/pkg/kubectl/scheme" ) @@ -593,14 +595,15 @@ func TestDrain(t *testing.T) { expectDelete: false, }, { - description: "orphaned DS-managed pod with --force", - node: node, - expected: cordonedNode, - pods: []corev1.Pod{orphanedDsPod}, - rcs: []corev1.ReplicationController{}, - args: []string{"node", "--force"}, - expectFatal: false, - expectDelete: true, + description: "orphaned DS-managed pod with --force", + node: node, + expected: cordonedNode, + pods: []corev1.Pod{orphanedDsPod}, + rcs: []corev1.ReplicationController{}, + args: []string{"node", "--force"}, + expectFatal: false, + expectDelete: true, + expectWarning: "WARNING: deleting Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet: default/bar", }, { description: "DS-managed pod with --ignore-daemonsets", @@ -619,7 +622,7 @@ func TestDrain(t *testing.T) { pods: []corev1.Pod{dsPodWithEmptyDir}, rcs: []corev1.ReplicationController{rc}, args: []string{"node", "--ignore-daemonsets"}, - expectWarning: "WARNING: ignoring DaemonSet-managed Pods: bar", + expectWarning: "WARNING: ignoring DaemonSet-managed Pods: default/bar", expectFatal: false, expectDelete: false, }, @@ -725,8 +728,7 @@ func TestDrain(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { newNode := &corev1.Node{} - deleted := false - evicted := false + var deletions, evictions int32 tf := cmdtesting.NewTestFactory() defer tf.Cleanup() @@ -763,8 +765,8 @@ func TestDrain(t *testing.T) { if testEviction { resourceList.APIResources = []metav1.APIResource{ { - Name: EvictionSubresource, - Kind: EvictionKind, + Name: drain.EvictionSubresource, + Kind: drain.EvictionKind, }, } } @@ -818,10 +820,11 @@ func TestDrain(t *testing.T) { } return &http.Response{StatusCode: 200, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, newNode)}, nil case m.isFor("DELETE", "/namespaces/default/pods/bar"): - deleted = true + atomic.AddInt32(&deletions, 1) return &http.Response{StatusCode: 204, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, &test.pods[0])}, nil case m.isFor("POST", "/namespaces/default/pods/bar/eviction"): - evicted = true + + atomic.AddInt32(&evictions, 1) return &http.Response{StatusCode: 201, Header: cmdtesting.DefaultHeader(), Body: cmdtesting.ObjBody(codec, &policyv1beta1.Eviction{})}, nil default: t.Fatalf("%s: unexpected request: %v %#v\n%#v", test.description, req.Method, req.URL, req) @@ -849,6 +852,8 @@ func TestDrain(t *testing.T) { }() if test.expectFatal { if !sawFatal { + //t.Logf("outBuf = %s", outBuf.String()) + //t.Logf("errBuf = %s", errBuf.String()) t.Fatalf("%s: unexpected non-error when using %s", test.description, currMethod) } } else { @@ -858,20 +863,34 @@ func TestDrain(t *testing.T) { } } + deleted := deletions > 0 + evicted := evictions > 0 + if test.expectDelete { // Test Delete if !testEviction && !deleted { t.Fatalf("%s: pod never deleted", test.description) } // Test Eviction - if testEviction && !evicted { - t.Fatalf("%s: pod never evicted", test.description) + if testEviction { + if !evicted { + t.Fatalf("%s: pod never evicted", test.description) + } + if evictions > 1 { + t.Fatalf("%s: asked to evict same pod %d too many times", test.description, evictions-1) + } } } if !test.expectDelete { if deleted { t.Fatalf("%s: unexpected delete when using %s", test.description, currMethod) } + if deletions > 1 { + t.Fatalf("%s: asked to deleted same pod %d too many times", test.description, deletions-1) + } + } + if deleted && evicted { + t.Fatalf("%s: same pod deleted %d times and evicted %d times", test.description, deletions, evictions) } if len(test.expectWarning) > 0 { @@ -958,7 +977,7 @@ func TestDeletePods(t *testing.T) { tf := cmdtesting.NewTestFactory() defer tf.Cleanup() - o := DrainOptions{ + o := DrainCmdOptions{ PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme), } o.Out = os.Stdout diff --git a/pkg/kubectl/drain/BUILD b/pkg/kubectl/drain/BUILD new file mode 100644 index 00000000000..aea03e06dcb --- /dev/null +++ b/pkg/kubectl/drain/BUILD @@ -0,0 +1,41 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = [ + "cordon.go", + "drain.go", + "filters.go", + ], + importpath = "k8s.io/kubernetes/pkg/kubectl/drain", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/api/apps/v1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/fields:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/kubectl/drain/cordon.go b/pkg/kubectl/drain/cordon.go new file mode 100644 index 00000000000..e3eb77fdab8 --- /dev/null +++ b/pkg/kubectl/drain/cordon.go @@ -0,0 +1,97 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/json" + "k8s.io/apimachinery/pkg/util/strategicpatch" + "k8s.io/client-go/kubernetes" +) + +// CordonHelper wraps functionality to cordon/uncordon nodes +type CordonHelper struct { + node *corev1.Node + desired bool +} + +// NewCordonHelper returns a new CordonHelper +func NewCordonHelper(node *corev1.Node) *CordonHelper { + return &CordonHelper{ + node: node, + } +} + +// NewCordonHelperFromRuntimeObject returns a new CordonHelper, or an error if given object is not a +// node or cannot be encoded as JSON +func NewCordonHelperFromRuntimeObject(nodeObject runtime.Object, scheme *runtime.Scheme, gvk schema.GroupVersionKind) (*CordonHelper, error) { + nodeObject, err := scheme.ConvertToVersion(nodeObject, gvk.GroupVersion()) + if err != nil { + return nil, err + } + + node, ok := nodeObject.(*corev1.Node) + if !ok { + return nil, fmt.Errorf("unexpected type %T", nodeObject) + } + + return NewCordonHelper(node), nil +} + +// UpdateIfRequired returns true if c.node.Spec.Unschedulable isn't already set, +// or false when no change is needed +func (c *CordonHelper) UpdateIfRequired(desired bool) bool { + c.desired = desired + if c.node.Spec.Unschedulable == c.desired { + return false + } + return true +} + +// PatchOrReplace uses given clientset to update the node status, either by patching or +// updating the given node object; it may return error if the object cannot be encoded as +// JSON, or if either patch or update calls fail; it will also return a second error +// whenever creating a patch has failed +func (c *CordonHelper) PatchOrReplace(clientset kubernetes.Interface) (error, error) { + client := clientset.Core().Nodes() + + oldData, err := json.Marshal(c.node) + if err != nil { + return err, nil + } + + c.node.Spec.Unschedulable = c.desired + + newData, err := json.Marshal(c.node) + if err != nil { + return err, nil + } + + patchBytes, patchErr := strategicpatch.CreateTwoWayMergePatch(oldData, newData, c.node) + if patchErr == nil { + _, err = client.Patch(c.node.Name, types.StrategicMergePatchType, patchBytes) + } else { + _, err = client.Update(c.node) + } + return err, patchErr +} diff --git a/pkg/kubectl/drain/drain.go b/pkg/kubectl/drain/drain.go new file mode 100644 index 00000000000..22069ee66d5 --- /dev/null +++ b/pkg/kubectl/drain/drain.go @@ -0,0 +1,159 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "io" + "time" + + corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" +) + +const ( + // EvictionKind represents the kind of evictions object + EvictionKind = "Eviction" + // EvictionSubresource represents the kind of evictions object as pod's subresource + EvictionSubresource = "pods/eviction" +) + +// Helper contains the parameters to control the behaviour of drainer +type Helper struct { + Client kubernetes.Interface + Force bool + DryRun bool + GracePeriodSeconds int + IgnoreAllDaemonSets bool + Timeout time.Duration + DeleteLocalData bool + Selector string + PodSelector string + ErrOut io.Writer +} + +// CheckEvictionSupport uses Discovery API to find out if the server support +// eviction subresource If support, it will return its groupVersion; Otherwise, +// it will return an empty string +func CheckEvictionSupport(clientset kubernetes.Interface) (string, error) { + discoveryClient := clientset.Discovery() + groupList, err := discoveryClient.ServerGroups() + if err != nil { + return "", err + } + foundPolicyGroup := false + var policyGroupVersion string + for _, group := range groupList.Groups { + if group.Name == "policy" { + foundPolicyGroup = true + policyGroupVersion = group.PreferredVersion.GroupVersion + break + } + } + if !foundPolicyGroup { + return "", nil + } + resourceList, err := discoveryClient.ServerResourcesForGroupVersion("v1") + if err != nil { + return "", err + } + for _, resource := range resourceList.APIResources { + if resource.Name == EvictionSubresource && resource.Kind == EvictionKind { + return policyGroupVersion, nil + } + } + return "", nil +} + +func (d *Helper) makeDeleteOptions() *metav1.DeleteOptions { + deleteOptions := &metav1.DeleteOptions{} + if d.GracePeriodSeconds >= 0 { + gracePeriodSeconds := int64(d.GracePeriodSeconds) + deleteOptions.GracePeriodSeconds = &gracePeriodSeconds + } + return deleteOptions +} + +// DeletePod will delete the given pod, or return an error if it couldn't +func (d *Helper) DeletePod(pod corev1.Pod) error { + return d.Client.CoreV1().Pods(pod.Namespace).Delete(pod.Name, d.makeDeleteOptions()) +} + +// EvictPod will evict the give pod, or return an error if it couldn't +func (d *Helper) EvictPod(pod corev1.Pod, policyGroupVersion string) error { + eviction := &policyv1beta1.Eviction{ + TypeMeta: metav1.TypeMeta{ + APIVersion: policyGroupVersion, + Kind: EvictionKind, + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + DeleteOptions: d.makeDeleteOptions(), + } + // Remember to change change the URL manipulation func when Eviction's version change + return d.Client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(eviction) +} + +// GetPodsForDeletion receives resource info for a node, and returns those pods as PodDeleteList, +// or error if it cannot list pods. All pods that are ready to be deleted can be obtained with .Pods(), +// and string with all warning can be obtained with .Warnings(), and .Errors() for all errors that +// occurred during deletion. +func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) { + labelSelector, err := labels.Parse(d.PodSelector) + if err != nil { + return nil, []error{err} + } + + podList, err := d.Client.CoreV1().Pods(metav1.NamespaceAll).List(metav1.ListOptions{ + LabelSelector: labelSelector.String(), + FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}).String()}) + if err != nil { + return nil, []error{err} + } + + pods := []podDelete{} + + for _, pod := range podList.Items { + var status podDeleteStatus + for _, filter := range d.makeFilters() { + status = filter(pod) + if !status.delete { + // short-circuit as soon as pod is filtered out + // at that point, there is no reason to run pod + // through any additional filters + break + } + } + pods = append(pods, podDelete{ + pod: pod, + status: status, + }) + } + + list := &podDeleteList{items: pods} + + if errs := list.errors(); len(errs) > 0 { + return list, errs + } + + return list, nil +} diff --git a/pkg/kubectl/drain/filters.go b/pkg/kubectl/drain/filters.go new file mode 100644 index 00000000000..2cbba24563a --- /dev/null +++ b/pkg/kubectl/drain/filters.go @@ -0,0 +1,223 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + "strings" + + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +const ( + daemonSetFatal = "DaemonSet-managed Pods (use --ignore-daemonsets to ignore)" + daemonSetWarning = "ignoring DaemonSet-managed Pods" + localStorageFatal = "Pods with local storage (use --delete-local-data to override)" + localStorageWarning = "deleting Pods with local storage" + unmanagedFatal = "Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet (use --force to override)" + unmanagedWarning = "deleting Pods not managed by ReplicationController, ReplicaSet, Job, DaemonSet or StatefulSet" +) + +type podDelete struct { + pod corev1.Pod + status podDeleteStatus +} + +type podDeleteList struct { + items []podDelete +} + +func (l *podDeleteList) Pods() []corev1.Pod { + pods := []corev1.Pod{} + for _, i := range l.items { + if i.status.delete { + pods = append(pods, i.pod) + } + } + return pods +} + +func (l *podDeleteList) Warnings() string { + ps := make(map[string][]string) + for _, i := range l.items { + if i.status.reason == podDeleteStatusTypeWarning { + ps[i.status.message] = append(ps[i.status.message], fmt.Sprintf("%s/%s", i.pod.Namespace, i.pod.Name)) + } + } + + msgs := []string{} + for key, pods := range ps { + msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", "))) + } + return strings.Join(msgs, "; ") +} + +func (l *podDeleteList) errors() []error { + failedPods := make(map[string][]string) + for _, i := range l.items { + if i.status.reason == podDeleteStatusTypeError { + msg := i.status.message + if msg == "" { + msg = "unexpected error" + } + failedPods[msg] = append(failedPods[msg], fmt.Sprintf("%s/%s", i.pod.Namespace, i.pod.Name)) + } + } + errs := make([]error, 0) + for msg, pods := range failedPods { + errs = append(errs, fmt.Errorf("cannot delete %s: %s", msg, strings.Join(pods, ", "))) + } + return errs +} + +type podDeleteStatus struct { + delete bool + reason string + message string +} + +// Takes a pod and returns a PodDeleteStatus +type podFilter func(corev1.Pod) podDeleteStatus + +const ( + podDeleteStatusTypeOkay = "Okay" + podDeleteStatusTypeSkip = "Skip" + podDeleteStatusTypeWarning = "Warning" + podDeleteStatusTypeError = "Error" +) + +func makePodDeleteStatusOkay() podDeleteStatus { + return podDeleteStatus{ + delete: true, + reason: podDeleteStatusTypeOkay, + } +} + +func makePodDeleteStatusSkip() podDeleteStatus { + return podDeleteStatus{ + delete: false, + reason: podDeleteStatusTypeSkip, + } +} + +func makePodDeleteStatusWithWarning(delete bool, message string) podDeleteStatus { + return podDeleteStatus{ + delete: delete, + reason: podDeleteStatusTypeWarning, + message: message, + } +} + +func makePodDeleteStatusWithError(message string) podDeleteStatus { + return podDeleteStatus{ + delete: false, + reason: podDeleteStatusTypeError, + message: message, + } +} + +func (d *Helper) makeFilters() []podFilter { + return []podFilter{ + d.daemonSetFilter, + d.mirrorPodFilter, + d.localStorageFilter, + d.unreplicatedFilter, + } +} + +func hasLocalStorage(pod corev1.Pod) bool { + for _, volume := range pod.Spec.Volumes { + if volume.EmptyDir != nil { + return true + } + } + + return false +} + +func (d *Helper) daemonSetFilter(pod corev1.Pod) podDeleteStatus { + // Note that we return false in cases where the pod is DaemonSet managed, + // regardless of flags. + // + // The exception is for pods that are orphaned (the referencing + // management resource - including DaemonSet - is not found). + // Such pods will be deleted if --force is used. + controllerRef := metav1.GetControllerOf(&pod) + if controllerRef == nil || controllerRef.Kind != appsv1.SchemeGroupVersion.WithKind("DaemonSet").Kind { + return makePodDeleteStatusOkay() + } + // Any finished pod can be removed. + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return makePodDeleteStatusOkay() + } + + if _, err := d.Client.AppsV1().DaemonSets(pod.Namespace).Get(controllerRef.Name, metav1.GetOptions{}); err != nil { + // remove orphaned pods with a warning if --force is used + if apierrors.IsNotFound(err) && d.Force { + return makePodDeleteStatusWithWarning(true, err.Error()) + } + + return makePodDeleteStatusWithError(err.Error()) + } + + if !d.IgnoreAllDaemonSets { + return makePodDeleteStatusWithError(daemonSetFatal) + } + + return makePodDeleteStatusWithWarning(false, daemonSetWarning) +} + +func (d *Helper) mirrorPodFilter(pod corev1.Pod) podDeleteStatus { + if _, found := pod.ObjectMeta.Annotations[corev1.MirrorPodAnnotationKey]; found { + return makePodDeleteStatusSkip() + } + return makePodDeleteStatusOkay() +} + +func (d *Helper) localStorageFilter(pod corev1.Pod) podDeleteStatus { + if !hasLocalStorage(pod) { + return makePodDeleteStatusOkay() + } + // Any finished pod can be removed. + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return makePodDeleteStatusOkay() + } + if !d.DeleteLocalData { + return makePodDeleteStatusWithError(localStorageFatal) + } + + return makePodDeleteStatusWithWarning(true, localStorageWarning) +} + +func (d *Helper) unreplicatedFilter(pod corev1.Pod) podDeleteStatus { + // any finished pod can be removed + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + return makePodDeleteStatusOkay() + } + + controllerRef := metav1.GetControllerOf(&pod) + if controllerRef != nil { + return makePodDeleteStatusOkay() + } + if d.Force { + return makePodDeleteStatusWithWarning(true, unmanagedWarning) + } + return makePodDeleteStatusWithError(unmanagedFatal) +}