diff --git a/pkg/kubectl/cmd/drain.go b/pkg/kubectl/cmd/drain.go index 6a2e2e36073..7257f0c00d0 100644 --- a/pkg/kubectl/cmd/drain.go +++ b/pkg/kubectl/cmd/drain.go @@ -22,19 +22,23 @@ import ( "io" "reflect" "strings" + "time" "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) type DrainOptions struct { @@ -44,11 +48,13 @@ type DrainOptions struct { Force bool GracePeriodSeconds int IgnoreDaemonsets bool + Timeout time.Duration DeleteLocalData bool mapper meta.RESTMapper nodeInfo *resource.Info out io.Writer typer runtime.ObjectTyper + ifPrint bool } // Takes a pod and returns a bool indicating whether or not to operate on the @@ -164,6 +170,7 @@ func NewCmdDrain(f cmdutil.Factory, out io.Writer) *cobra.Command { cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.") cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).") cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.") + cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up on a delete, zero means determine a timeout from the size of the object") return cmd } @@ -191,6 +198,8 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error { return err } + o.ifPrint = true + r := o.factory.NewBuilder(). NamespaceParam(cmdNamespace).DefaultNamespace(). ResourceNames("node", args[0]). @@ -392,10 +401,44 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error { if err != nil { return err } - cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted") } - return nil + getPodFn := func(namespace, name string) (*api.Pod, error) { + return o.client.Core().Pods(namespace).Get(name) + } + pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn) + if err != nil { + fmt.Fprintf(o.out, "There are pending pods when an error occured:\n") + for _, pendindPod := range pendingPods { + cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "") + } + } + return err +} + +func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) { + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []api.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + if o.ifPrint { + cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted") + } + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err } // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for diff --git a/pkg/kubectl/cmd/drain_test.go b/pkg/kubectl/cmd/drain_test.go index 572d4b7f57f..d690da79a4d 100644 --- a/pkg/kubectl/cmd/drain_test.go +++ b/pkg/kubectl/cmd/drain_test.go @@ -18,6 +18,8 @@ package cmd import ( "bytes" + "errors" + "fmt" "io" "io/ioutil" "net/http" @@ -31,6 +33,7 @@ import ( "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" @@ -40,6 +43,8 @@ import ( cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/wait" ) var node *api.Node @@ -477,6 +482,8 @@ func TestDrain(t *testing.T) { return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil case m.isFor("GET", "/namespaces/default/replicasets/rs"): return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil + case m.isFor("GET", "/namespaces/default/pods/bar"): + return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, nil)}, nil case m.isFor("GET", "/pods"): values, err := url.ParseQuery(req.URL.RawQuery) if err != nil { @@ -549,6 +556,122 @@ func TestDrain(t *testing.T) { } } +func TestDeletePods(t *testing.T) { + ifHasBeenCalled := map[string]bool{} + tests := []struct { + description string + interval time.Duration + timeout time.Duration + expectPendingPods bool + expectError bool + expectedError *error + getPodFn func(namespace, name string) (*api.Pod, error) + }{ + { + description: "Wait for deleting to complete", + interval: 100 * time.Millisecond, + timeout: 10 * time.Second, + expectPendingPods: false, + expectError: false, + expectedError: nil, + getPodFn: func(namespace, name string) (*api.Pod, error) { + oldPodMap, _ := createPods(false) + newPodMap, _ := createPods(true) + if oldPod, found := oldPodMap[name]; found { + if _, ok := ifHasBeenCalled[name]; !ok { + ifHasBeenCalled[name] = true + return &oldPod, nil + } else { + if oldPod.ObjectMeta.Generation < 4 { + newPod := newPodMap[name] + return &newPod, nil + } else { + return nil, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name) + } + } + } + return nil, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name) + }, + }, + { + description: "Deleting could timeout", + interval: 200 * time.Millisecond, + timeout: 3 * time.Second, + expectPendingPods: true, + expectError: true, + expectedError: &wait.ErrWaitTimeout, + getPodFn: func(namespace, name string) (*api.Pod, error) { + oldPodMap, _ := createPods(false) + if oldPod, found := oldPodMap[name]; found { + return &oldPod, nil + } + return nil, errors.New(fmt.Sprintf("%q: not found", name)) + }, + }, + { + description: "Client error could be passed out", + interval: 200 * time.Millisecond, + timeout: 5 * time.Second, + expectPendingPods: true, + expectError: true, + expectedError: nil, + getPodFn: func(namespace, name string) (*api.Pod, error) { + return nil, errors.New("This is a random error for testing") + }, + }, + } + + o := DrainOptions{} + o.ifPrint = false + for _, test := range tests { + _, pods := createPods(false) + pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, test.getPodFn) + + if test.expectError { + if err == nil { + t.Fatalf("%s: unexpected non-error", test.description) + } else if test.expectedError != nil { + if *test.expectedError != err { + t.Fatalf("%s: the error does not match expected error", test.description) + } + } + } + if !test.expectError && err != nil { + t.Fatalf("%s: unexpected error", test.description) + } + if test.expectPendingPods && len(pendingPods) == 0 { + t.Fatalf("%s: unexpected empty pods", test.description) + } + if !test.expectPendingPods && len(pendingPods) > 0 { + t.Fatalf("%s: unexpected pending pods", test.description) + } + } +} + +func createPods(ifCreateNewPods bool) (map[string]api.Pod, []api.Pod) { + podMap := make(map[string]api.Pod) + podSlice := []api.Pod{} + for i := 0; i < 8; i++ { + var uid types.UID + if ifCreateNewPods { + uid = types.UID(i) + } else { + uid = types.UID(string(i) + string(i)) + } + pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod" + string(i), + Namespace: "default", + UID: uid, + Generation: int64(i), + }, + } + podMap[pod.Name] = pod + podSlice = append(podSlice, pod) + } + return podMap, podSlice +} + type MyReq struct { Request *http.Request }