address comments and add tests

This commit is contained in:
ymqytw
2016-10-18 16:00:54 -07:00
parent 3bd3c9570f
commit 55c6116d66
2 changed files with 124 additions and 9 deletions

View File

@@ -32,8 +32,8 @@ import (
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
"k8s.io/kubernetes/pkg/kubectl" "k8s.io/kubernetes/pkg/kubectl"
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/kubectl/resource" "k8s.io/kubernetes/pkg/kubectl/resource"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
@@ -54,6 +54,7 @@ type DrainOptions struct {
nodeInfo *resource.Info nodeInfo *resource.Info
out io.Writer out io.Writer
typer runtime.ObjectTyper typer runtime.ObjectTyper
ifPrint bool
} }
// Takes a pod and returns a bool indicating whether or not to operate on the // Takes a pod and returns a bool indicating whether or not to operate on the
@@ -197,6 +198,8 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
return err return err
} }
o.ifPrint = true
r := o.factory.NewBuilder(). r := o.factory.NewBuilder().
NamespaceParam(cmdNamespace).DefaultNamespace(). NamespaceParam(cmdNamespace).DefaultNamespace().
ResourceNames("node", args[0]). ResourceNames("node", args[0]).
@@ -400,26 +403,42 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error {
} }
} }
return wait.PollImmediate(kubectl.Interval, o.Timeout, func() (bool, error) { getPodFn := func(namespace, name string) (*api.Pod, error) {
pendingPodCnt := 0 return o.client.Core().Pods(namespace).Get(name)
}
pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
if err != nil {
fmt.Fprintf(o.out, "There are pending pods when an error occured:\n")
for _, pendindPod := range pendingPods {
cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "")
}
}
return err
}
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) {
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
pendingPods := []api.Pod{}
for i, pod := range pods { for i, pod := range pods {
p, err := o.client.Core().Pods(pod.Namespace).Get(pod.Name) p, err := getPodFn(pod.Namespace, pod.Name)
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted") if o.ifPrint {
cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted")
}
continue continue
} else if err != nil { } else if err != nil {
return false, err return false, err
} else { } else {
pods[pendingPodCnt] = pods[i] pendingPods = append(pendingPods, pods[i])
pendingPodCnt++
} }
} }
if pendingPodCnt > 0 { pods = pendingPods
pods = pods[:pendingPodCnt] if len(pendingPods) > 0 {
return false, nil return false, nil
} }
return true, nil return true, nil
}) })
return pods, err
} }
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for

View File

@@ -18,8 +18,11 @@ package cmd
import ( import (
"bytes" "bytes"
"errors"
"fmt"
"io" "io"
"io/ioutil" "io/ioutil"
"math/rand"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@@ -31,6 +34,7 @@ import (
"github.com/spf13/cobra" "github.com/spf13/cobra"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/batch" "k8s.io/kubernetes/pkg/apis/batch"
@@ -39,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/conversion" "k8s.io/kubernetes/pkg/conversion"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util" cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/types"
) )
var node *api.Node var node *api.Node
@@ -550,6 +555,97 @@ func TestDrain(t *testing.T) {
} }
} }
func TestDeletePods(t *testing.T) {
tests := []struct {
description string
interval time.Duration
timeout time.Duration
expectPendingPods bool
expectError bool
getPodFn func(namespace, name string) (*api.Pod, error)
}{
{
description: "Wait for deleting to complete",
interval: 100 * time.Millisecond,
timeout: 10 * time.Second,
expectPendingPods: false,
expectError: false,
getPodFn: func(namespace, name string) (*api.Pod, error) {
oldPodMap, _ := createPods(false)
newPodMap, _ := createPods(true)
if newPod, found := newPodMap[name]; found {
// randomly return old pod
if rand.Float32() < 0.6 {
oldPod := oldPodMap[name]
return &oldPod, nil
} else {
// randomly return a new pod or a NotFound error
if rand.Float32() < 0.5 {
return &newPod, nil
} else {
return &api.Pod{}, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name)
}
}
}
return &api.Pod{}, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name)
},
},
{
description: "Deleting could timeout",
interval: 200 * time.Millisecond,
timeout: 3 * time.Second,
expectPendingPods: true,
expectError: true,
getPodFn: func(namespace, name string) (*api.Pod, error) {
oldPodMap, _ := createPods(false)
if oldPod, found := oldPodMap[name]; found {
return &oldPod, nil
}
return &api.Pod{}, errors.New(fmt.Sprintf("%q: not found", name))
},
},
}
o := DrainOptions{}
o.ifPrint = false
for _, test := range tests {
_, pods := createPods(false)
pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, test.getPodFn)
if test.expectError && err == nil && test.expectPendingPods && len(pendingPods) > 0 {
t.Fatalf("%s: unexpected non-error", test.description)
}
if !test.expectError && err != nil && !test.expectPendingPods && len(pendingPods) == 0 {
t.Fatalf("%s: unexpected error", test.description)
}
}
}
func createPods(ifCreateNewPods bool) (map[string]api.Pod, []api.Pod) {
podMap := make(map[string]api.Pod)
podSlice := []api.Pod{}
for i := 0; i < 8; i++ {
var uid types.UID
if ifCreateNewPods {
uid = types.UID(i)
} else {
uid = types.UID(string(i) + string(i))
}
pod := api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "pod" + string(i),
Namespace: "default",
UID: uid,
},
}
podMap[pod.Name] = pod
podSlice = append(podSlice, pod)
}
return podMap, podSlice
}
type MyReq struct { type MyReq struct {
Request *http.Request Request *http.Request
} }