Stop 'drain' deleting pods with local storage.

Unless forced with --delete-local-data.  Also a refactoring of the
kubectl drain logic that selects/rejects pods and produces error/warning
messages.
This commit is contained in:
Matt Liggett 2016-06-01 14:50:13 -07:00
parent e79f046990
commit d09af4a1d6
6 changed files with 196 additions and 130 deletions

View File

@ -32,6 +32,10 @@ will make the node schedulable again.
.SH OPTIONS .SH OPTIONS
.PP
\fB\-\-delete\-local\-data\fP=false
Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).
.PP .PP
\fB\-\-force\fP=false \fB\-\-force\fP=false
Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet. Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.

View File

@ -73,6 +73,7 @@ $ kubectl drain foo --grace-period=900
### Options ### Options
``` ```
--delete-local-data[=false]: Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).
--force[=false]: Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet. --force[=false]: Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.
--grace-period=-1: Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used. --grace-period=-1: Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.
--ignore-daemonsets[=false]: Ignore DaemonSet-managed pods. --ignore-daemonsets[=false]: Ignore DaemonSet-managed pods.
@ -110,7 +111,7 @@ $ kubectl drain foo --grace-period=900
* [kubectl](kubectl.md) - kubectl controls the Kubernetes cluster manager * [kubectl](kubectl.md) - kubectl controls the Kubernetes cluster manager
###### Auto generated by spf13/cobra on 15-Apr-2016 ###### Auto generated by spf13/cobra on 6-Jun-2016
<!-- BEGIN MUNGE: GENERATED_ANALYTICS --> <!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/user-guide/kubectl/kubectl_drain.md?pixel)]() [![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/docs/user-guide/kubectl/kubectl_drain.md?pixel)]()

View File

@ -16,6 +16,10 @@ description: |
When you are ready to put the node back into service, use kubectl uncordon, which When you are ready to put the node back into service, use kubectl uncordon, which
will make the node schedulable again. will make the node schedulable again.
options: options:
- name: delete-local-data
default_value: "false"
usage: |
Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).
- name: force - name: force
default_value: "false" default_value: "false"
usage: | usage: |

View File

@ -89,6 +89,7 @@ default-container-mem-limit
delay-shutdown delay-shutdown
delete-collection-workers delete-collection-workers
delete-instances delete-instances
delete-local-data
delete-namespace delete-namespace
deleting-pods-burst deleting-pods-burst
deleting-pods-qps deleting-pods-qps

View File

@ -27,9 +27,8 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/controller"
// "k8s.io/kubernetes/pkg/api/unversioned"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/resource"
@ -43,14 +42,31 @@ type DrainOptions struct {
Force bool Force bool
GracePeriodSeconds int GracePeriodSeconds int
IgnoreDaemonsets bool IgnoreDaemonsets bool
DeleteLocalData bool
mapper meta.RESTMapper mapper meta.RESTMapper
nodeInfo *resource.Info nodeInfo *resource.Info
out io.Writer out io.Writer
typer runtime.ObjectTyper typer runtime.ObjectTyper
} }
// Takes a pod and returns a bool indicating whether or not to operate on the
// pod, an optional warning message, and an optional fatal error.
type podFilter func(api.Pod) (include bool, w *warning, f *fatal)
type warning struct {
string
}
type fatal struct {
string
}
const ( const (
cordon_long = `Mark node as unschedulable. kDaemonsetFatal = "DaemonSet-managed pods (use --ignore-daemonsets to ignore)"
kDaemonsetWarning = "Ignoring DaemonSet-managed pods"
kLocalStorageFatal = "pods with local storage (use --delete-local-data to override)"
kLocalStorageWarning = "Deleting pods with local storage"
kUnmanagedFatal = "pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet (use --force to override)"
kUnmanagedWarning = "Deleting pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet"
cordon_long = `Mark node as unschedulable.
` `
cordon_example = `# Mark node "foo" as unschedulable. cordon_example = `# Mark node "foo" as unschedulable.
kubectl cordon foo kubectl cordon foo
@ -136,6 +152,7 @@ func NewCmdDrain(f *cmdutil.Factory, out io.Writer) *cobra.Command {
} }
cmd.Flags().BoolVar(&options.Force, "force", false, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.") cmd.Flags().BoolVar(&options.Force, "force", false, "Continue even if there are pods not managed by a ReplicationController, ReplicaSet, Job, or DaemonSet.")
cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.") cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.") cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
return cmd return cmd
} }
@ -195,148 +212,151 @@ func (o *DrainOptions) RunDrain() error {
return nil return nil
} }
// getPodsForDeletion returns all the pods we're going to delete. If there are func (o *DrainOptions) getController(sr *api.SerializedReference) (interface{}, error) {
// any unmanaged pods and the user didn't pass --force, we return that list in switch sr.Reference.Kind {
// an error. case "ReplicationController":
func (o *DrainOptions) getPodsForDeletion() ([]api.Pod, error) { return o.client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name)
pods, unreplicatedPodNames, daemonSetPodNames, err := GetPodsForDeletionOnNodeDrain( case "DaemonSet":
o.client, return o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name)
o.nodeInfo.Name, case "Job":
o.factory.Decoder(true), return o.client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
o.Force, case "ReplicaSet":
o.IgnoreDaemonsets, return o.client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
)
if err != nil {
return []api.Pod{}, err
} }
return nil, fmt.Errorf("Unknown controller kind %q", sr.Reference.Kind)
daemonSetErrors := !o.IgnoreDaemonsets && len(daemonSetPodNames) > 0
unreplicatedErrors := !o.Force && len(unreplicatedPodNames) > 0
switch {
case daemonSetErrors && unreplicatedErrors:
return []api.Pod{}, errors.New(unmanagedMsg(unreplicatedPodNames, daemonSetPodNames, true))
case daemonSetErrors && !unreplicatedErrors:
return []api.Pod{}, errors.New(unmanagedMsg([]string{}, daemonSetPodNames, true))
case unreplicatedErrors && !daemonSetErrors:
return []api.Pod{}, errors.New(unmanagedMsg(unreplicatedPodNames, []string{}, true))
}
if len(unreplicatedPodNames) > 0 {
fmt.Fprintf(o.out, "WARNING: About to delete these %s\n", unmanagedMsg(unreplicatedPodNames, []string{}, false))
}
if len(daemonSetPodNames) > 0 {
fmt.Fprintf(o.out, "WARNING: Skipping %s\n", unmanagedMsg([]string{}, daemonSetPodNames, false))
}
return pods, nil
} }
// GetPodsForDeletionOnNodeDrain returns pods that should be deleted on node drain as well as some extra information func (o *DrainOptions) getPodCreator(pod api.Pod) (*api.SerializedReference, error) {
// about possibly problematic pods (unreplicated and deamon sets). creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation]
func GetPodsForDeletionOnNodeDrain(client *client.Client, nodename string, decoder runtime.Decoder, force bool, if !found {
ignoreDeamonSet bool) (pods []api.Pod, unreplicatedPodNames []string, daemonSetPodNames []string, finalError error) { return nil, nil
pods = []api.Pod{}
unreplicatedPodNames = []string{}
daemonSetPodNames = []string{}
podList, err := client.Pods(api.NamespaceAll).List(api.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename})})
if err != nil {
return []api.Pod{}, []string{}, []string{}, err
} }
// Now verify that the specified creator actually exists.
sr := &api.SerializedReference{}
if err := runtime.DecodeInto(o.factory.Decoder(true), []byte(creatorRef), sr); err != nil {
return nil, err
}
// We assume the only reason for an error is because the controller is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
_, err := o.getController(sr)
if err != nil {
return nil, err
}
return sr, nil
}
func (o *DrainOptions) unreplicatedFilter(pod api.Pod) (bool, *warning, *fatal) {
sr, err := o.getPodCreator(pod)
if err != nil {
return false, nil, &fatal{err.Error()}
}
if sr != nil {
return true, nil, nil
}
if !o.Force {
return false, nil, &fatal{kUnmanagedFatal}
}
return true, &warning{kUnmanagedWarning}, nil
}
func (o *DrainOptions) daemonsetFilter(pod api.Pod) (bool, *warning, *fatal) {
// Note that we return false in all cases where the pod is DaemonSet managed,
// regardless of flags. We never delete them, the only question is whether
// their presence constitutes an error.
sr, err := o.getPodCreator(pod)
if err != nil {
return false, nil, &fatal{err.Error()}
}
if sr == nil || sr.Reference.Kind != "DaemonSet" {
return true, nil, nil
}
if _, err := o.client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name); err != nil {
return false, nil, &fatal{err.Error()}
}
if !o.IgnoreDaemonsets {
return false, nil, &fatal{kDaemonsetFatal}
}
return false, &warning{kDaemonsetWarning}, nil
}
func mirrorPodFilter(pod api.Pod) (bool, *warning, *fatal) {
if _, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey]; found {
return false, nil, nil
}
return true, nil, nil
}
func hasLocalStorage(pod api.Pod) bool {
for _, volume := range pod.Spec.Volumes {
if volume.EmptyDir != nil {
return true
}
}
return false
}
func (o *DrainOptions) localStorageFilter(pod api.Pod) (bool, *warning, *fatal) {
if !hasLocalStorage(pod) {
return true, nil, nil
}
if !o.DeleteLocalData {
return false, nil, &fatal{kLocalStorageFatal}
}
return true, &warning{kLocalStorageWarning}, nil
}
// Map of status message to a list of pod names having that status.
type podStatuses map[string][]string
func (ps podStatuses) Message() string {
msgs := []string{}
for key, pods := range ps {
msgs = append(msgs, fmt.Sprintf("%s: %s", key, strings.Join(pods, ", ")))
}
return strings.Join(msgs, "; ")
}
// getPodsForDeletion returns all the pods we're going to delete. If there are
// any pods preventing us from deleting, we return that list in an error.
func (o *DrainOptions) getPodsForDeletion() (pods []api.Pod, err error) {
podList, err := o.client.Pods(api.NamespaceAll).List(api.ListOptions{
FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeInfo.Name})})
if err != nil {
return pods, err
}
ws := podStatuses{}
fs := podStatuses{}
for _, pod := range podList.Items { for _, pod := range podList.Items {
_, found := pod.ObjectMeta.Annotations[types.ConfigMirrorAnnotationKey] podOk := true
if found { for _, filt := range []podFilter{mirrorPodFilter, o.localStorageFilter, o.unreplicatedFilter, o.daemonsetFilter} {
// Skip mirror pod filterOk, w, f := filt(pod)
continue
}
replicated := false
daemonset_pod := false
creatorRef, found := pod.ObjectMeta.Annotations[controller.CreatedByAnnotation] podOk = podOk && filterOk
if found { if w != nil {
// Now verify that the specified creator actually exists. ws[w.string] = append(ws[w.string], pod.Name)
var sr api.SerializedReference
if err := runtime.DecodeInto(decoder, []byte(creatorRef), &sr); err != nil {
return []api.Pod{}, []string{}, []string{}, err
} }
if sr.Reference.Kind == "ReplicationController" { if f != nil {
rc, err := client.ReplicationControllers(sr.Reference.Namespace).Get(sr.Reference.Name) fs[f.string] = append(fs[f.string], pod.Name)
// Assume the only reason for an error is because the RC is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && rc != nil {
replicated = true
}
} else if sr.Reference.Kind == "DaemonSet" {
ds, err := client.DaemonSets(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the DaemonSet is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && ds != nil {
// Otherwise, treat daemonset-managed pods as unmanaged since
// DaemonSet Controller currently ignores the unschedulable bit.
// FIXME(mml): Add link to the issue concerning a proper way to drain
// daemonset pods, probably using taints.
daemonset_pod = true
}
} else if sr.Reference.Kind == "Job" {
job, err := client.ExtensionsClient.Jobs(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the Job is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && job != nil {
replicated = true
}
} else if sr.Reference.Kind == "ReplicaSet" {
rs, err := client.ExtensionsClient.ReplicaSets(sr.Reference.Namespace).Get(sr.Reference.Name)
// Assume the only reason for an error is because the RS is
// gone/missing, not for any other cause. TODO(mml): something more
// sophisticated than this
if err == nil && rs != nil {
replicated = true
}
} }
} }
if podOk {
switch {
case daemonset_pod:
daemonSetPodNames = append(daemonSetPodNames, pod.Name)
case !replicated:
unreplicatedPodNames = append(unreplicatedPodNames, pod.Name)
if force {
pods = append(pods, pod)
}
default:
pods = append(pods, pod) pods = append(pods, pod)
} }
} }
return pods, unreplicatedPodNames, daemonSetPodNames, nil
}
// Helper for generating errors or warnings about unmanaged pods. if len(fs) > 0 {
func unmanagedMsg(unreplicatedNames []string, daemonSetNames []string, include_guidance bool) string { return []api.Pod{}, errors.New(fs.Message())
msgs := []string{}
if len(unreplicatedNames) > 0 {
msg := fmt.Sprintf("pods not managed by ReplicationController, ReplicaSet, Job, or DaemonSet: %s", strings.Join(unreplicatedNames, ","))
if include_guidance {
msg += " (use --force to override)"
}
msgs = append(msgs, msg)
} }
if len(daemonSetNames) > 0 { if len(ws) > 0 {
msg := fmt.Sprintf("DaemonSet-managed pods: %s", strings.Join(daemonSetNames, ",")) fmt.Fprintf(o.out, "WARNING: %s\n", ws.Message())
if include_guidance {
msg += " (use --ignore-daemonsets to ignore)"
}
msgs = append(msgs, msg)
} }
return pods, nil
return strings.Join(msgs, " and ")
} }
// deletePods deletes the pods on the api server // deletePods deletes the pods on the api server

View File

@ -325,6 +325,24 @@ func TestDrain(t *testing.T) {
}, },
} }
emptydir_pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "bar",
Namespace: "default",
CreationTimestamp: unversioned.Time{Time: time.Now()},
Labels: labels,
},
Spec: api.PodSpec{
NodeName: "node",
Volumes: []api.Volume{
{
Name: "scratch",
VolumeSource: api.VolumeSource{EmptyDir: &api.EmptyDirVolumeSource{Medium: ""}},
},
},
},
}
tests := []struct { tests := []struct {
description string description string
node *api.Node node *api.Node
@ -406,6 +424,24 @@ func TestDrain(t *testing.T) {
expectFatal: false, expectFatal: false,
expectDelete: true, expectDelete: true,
}, },
{
description: "pod with EmptyDir",
node: node,
expected: cordoned_node,
pods: []api.Pod{emptydir_pod},
args: []string{"node", "--force"},
expectFatal: true,
expectDelete: false,
},
{
description: "pod with EmptyDir and --delete-local-data",
node: node,
expected: cordoned_node,
pods: []api.Pod{emptydir_pod},
args: []string{"node", "--force", "--delete-local-data=true"},
expectFatal: false,
expectDelete: true,
},
{ {
description: "empty node", description: "empty node",
node: node, node: node,