diff --git a/pkg/kubectl/cmd/drain.go b/pkg/kubectl/cmd/drain.go index c11869b1589..7257f0c00d0 100644 --- a/pkg/kubectl/cmd/drain.go +++ b/pkg/kubectl/cmd/drain.go @@ -32,8 +32,8 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/kubectl/cmd/templates" "k8s.io/kubernetes/pkg/kubectl" + "k8s.io/kubernetes/pkg/kubectl/cmd/templates" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubelet/types" @@ -54,6 +54,7 @@ type DrainOptions struct { nodeInfo *resource.Info out io.Writer typer runtime.ObjectTyper + ifPrint bool } // Takes a pod and returns a bool indicating whether or not to operate on the @@ -197,6 +198,8 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error { return err } + o.ifPrint = true + r := o.factory.NewBuilder(). NamespaceParam(cmdNamespace).DefaultNamespace(). ResourceNames("node", args[0]). @@ -400,26 +403,42 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error { } } - return wait.PollImmediate(kubectl.Interval, o.Timeout, func() (bool, error) { - pendingPodCnt := 0 + getPodFn := func(namespace, name string) (*api.Pod, error) { + return o.client.Core().Pods(namespace).Get(name) + } + pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn) + if err != nil { + fmt.Fprintf(o.out, "There are pending pods when an error occured:\n") + for _, pendindPod := range pendingPods { + cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "") + } + } + return err +} + +func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) { + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []api.Pod{} for i, pod := range pods { - p, err := o.client.Core().Pods(pod.Namespace).Get(pod.Name) + p, err := getPodFn(pod.Namespace, pod.Name) if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { - cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted") + if o.ifPrint { + cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted") + } continue } else if err != nil { return false, err } else { - pods[pendingPodCnt] = pods[i] - pendingPodCnt++ + pendingPods = append(pendingPods, pods[i]) } } - if pendingPodCnt > 0 { - pods = pods[:pendingPodCnt] + pods = pendingPods + if len(pendingPods) > 0 { return false, nil } return true, nil }) + return pods, err } // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for diff --git a/pkg/kubectl/cmd/drain_test.go b/pkg/kubectl/cmd/drain_test.go index 431bb3e0e69..5f552761c8a 100644 --- a/pkg/kubectl/cmd/drain_test.go +++ b/pkg/kubectl/cmd/drain_test.go @@ -18,8 +18,11 @@ package cmd import ( "bytes" + "errors" + "fmt" "io" "io/ioutil" + "math/rand" "net/http" "net/url" "os" @@ -31,6 +34,7 @@ import ( "github.com/spf13/cobra" "k8s.io/kubernetes/pkg/api" + apierrors "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/batch" @@ -39,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/conversion" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/types" ) var node *api.Node @@ -550,6 +555,97 @@ func TestDrain(t *testing.T) { } } +func TestDeletePods(t *testing.T) { + tests := []struct { + description string + interval time.Duration + timeout time.Duration + expectPendingPods bool + expectError bool + getPodFn func(namespace, name string) (*api.Pod, error) + }{ + { + description: "Wait for deleting to complete", + interval: 100 * time.Millisecond, + timeout: 10 * time.Second, + expectPendingPods: false, + expectError: false, + getPodFn: func(namespace, name string) (*api.Pod, error) { + oldPodMap, _ := createPods(false) + newPodMap, _ := createPods(true) + if newPod, found := newPodMap[name]; found { + // randomly return old pod + if rand.Float32() < 0.6 { + oldPod := oldPodMap[name] + return &oldPod, nil + } else { + // randomly return a new pod or a NotFound error + if rand.Float32() < 0.5 { + return &newPod, nil + } else { + return &api.Pod{}, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name) + } + } + } + return &api.Pod{}, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name) + }, + }, + { + description: "Deleting could timeout", + interval: 200 * time.Millisecond, + timeout: 3 * time.Second, + expectPendingPods: true, + expectError: true, + getPodFn: func(namespace, name string) (*api.Pod, error) { + oldPodMap, _ := createPods(false) + if oldPod, found := oldPodMap[name]; found { + return &oldPod, nil + } + return &api.Pod{}, errors.New(fmt.Sprintf("%q: not found", name)) + }, + }, + } + + o := DrainOptions{} + o.ifPrint = false + for _, test := range tests { + _, pods := createPods(false) + pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, test.getPodFn) + + if test.expectError && err == nil && test.expectPendingPods && len(pendingPods) > 0 { + t.Fatalf("%s: unexpected non-error", test.description) + } + if !test.expectError && err != nil && !test.expectPendingPods && len(pendingPods) == 0 { + t.Fatalf("%s: unexpected error", test.description) + } + + } + +} + +func createPods(ifCreateNewPods bool) (map[string]api.Pod, []api.Pod) { + podMap := make(map[string]api.Pod) + podSlice := []api.Pod{} + for i := 0; i < 8; i++ { + var uid types.UID + if ifCreateNewPods { + uid = types.UID(i) + } else { + uid = types.UID(string(i) + string(i)) + } + pod := api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "pod" + string(i), + Namespace: "default", + UID: uid, + }, + } + podMap[pod.Name] = pod + podSlice = append(podSlice, pod) + } + return podMap, podSlice +} + type MyReq struct { Request *http.Request }