mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 03:33:56 +00:00
Merge pull request #34778 from ymqytw/safe_kubectl_drain
Automatic merge from submit-queue wait until the pods are deleted completely Drain the pods on a node safely by keeping polling until all pods has been deleted. ```release-note kubectl drain now waits until pods have been delete from the Node before exiting ``` Fixes: #34782
This commit is contained in:
commit
ad9ee6a50c
@ -22,19 +22,23 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/meta"
|
"k8s.io/kubernetes/pkg/api/meta"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
"k8s.io/kubernetes/pkg/client/restclient"
|
"k8s.io/kubernetes/pkg/client/restclient"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
|
"k8s.io/kubernetes/pkg/kubectl"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
"k8s.io/kubernetes/pkg/kubectl/cmd/templates"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/kubectl/resource"
|
"k8s.io/kubernetes/pkg/kubectl/resource"
|
||||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
type DrainOptions struct {
|
type DrainOptions struct {
|
||||||
@ -44,11 +48,13 @@ type DrainOptions struct {
|
|||||||
Force bool
|
Force bool
|
||||||
GracePeriodSeconds int
|
GracePeriodSeconds int
|
||||||
IgnoreDaemonsets bool
|
IgnoreDaemonsets bool
|
||||||
|
Timeout time.Duration
|
||||||
DeleteLocalData bool
|
DeleteLocalData bool
|
||||||
mapper meta.RESTMapper
|
mapper meta.RESTMapper
|
||||||
nodeInfo *resource.Info
|
nodeInfo *resource.Info
|
||||||
out io.Writer
|
out io.Writer
|
||||||
typer runtime.ObjectTyper
|
typer runtime.ObjectTyper
|
||||||
|
ifPrint bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Takes a pod and returns a bool indicating whether or not to operate on the
|
// Takes a pod and returns a bool indicating whether or not to operate on the
|
||||||
@ -164,6 +170,7 @@ func NewCmdDrain(f cmdutil.Factory, out io.Writer) *cobra.Command {
|
|||||||
cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
|
cmd.Flags().BoolVar(&options.IgnoreDaemonsets, "ignore-daemonsets", false, "Ignore DaemonSet-managed pods.")
|
||||||
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
|
cmd.Flags().BoolVar(&options.DeleteLocalData, "delete-local-data", false, "Continue even if there are pods using emptyDir (local data that will be deleted when the node is drained).")
|
||||||
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
|
cmd.Flags().IntVar(&options.GracePeriodSeconds, "grace-period", -1, "Period of time in seconds given to each pod to terminate gracefully. If negative, the default value specified in the pod will be used.")
|
||||||
|
cmd.Flags().DurationVar(&options.Timeout, "timeout", 0, "The length of time to wait before giving up on a delete, zero means determine a timeout from the size of the object")
|
||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -191,6 +198,8 @@ func (o *DrainOptions) SetupDrain(cmd *cobra.Command, args []string) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
o.ifPrint = true
|
||||||
|
|
||||||
r := o.factory.NewBuilder().
|
r := o.factory.NewBuilder().
|
||||||
NamespaceParam(cmdNamespace).DefaultNamespace().
|
NamespaceParam(cmdNamespace).DefaultNamespace().
|
||||||
ResourceNames("node", args[0]).
|
ResourceNames("node", args[0]).
|
||||||
@ -392,10 +401,44 @@ func (o *DrainOptions) deletePods(pods []api.Pod) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
getPodFn := func(namespace, name string) (*api.Pod, error) {
|
||||||
|
return o.client.Core().Pods(namespace).Get(name)
|
||||||
|
}
|
||||||
|
pendingPods, err := o.waitForDelete(pods, kubectl.Interval, o.Timeout, getPodFn)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(o.out, "There are pending pods when an error occured:\n")
|
||||||
|
for _, pendindPod := range pendingPods {
|
||||||
|
cmdutil.PrintSuccess(o.mapper, true, o.out, "pod", pendindPod.Name, false, "")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *DrainOptions) waitForDelete(pods []api.Pod, interval, timeout time.Duration, getPodFn func(namespace, name string) (*api.Pod, error)) ([]api.Pod, error) {
|
||||||
|
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||||
|
pendingPods := []api.Pod{}
|
||||||
|
for i, pod := range pods {
|
||||||
|
p, err := getPodFn(pod.Namespace, pod.Name)
|
||||||
|
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
|
||||||
|
if o.ifPrint {
|
||||||
|
cmdutil.PrintSuccess(o.mapper, false, o.out, "pod", pod.Name, false, "deleted")
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
pendingPods = append(pendingPods, pods[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pods = pendingPods
|
||||||
|
if len(pendingPods) > 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
return pods, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
||||||
|
@ -18,6 +18,8 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
@ -31,6 +33,7 @@ import (
|
|||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
apierrors "k8s.io/kubernetes/pkg/api/errors"
|
||||||
"k8s.io/kubernetes/pkg/api/testapi"
|
"k8s.io/kubernetes/pkg/api/testapi"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/apis/batch"
|
"k8s.io/kubernetes/pkg/apis/batch"
|
||||||
@ -40,6 +43,8 @@ import (
|
|||||||
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
|
cmdtesting "k8s.io/kubernetes/pkg/kubectl/cmd/testing"
|
||||||
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
|
||||||
"k8s.io/kubernetes/pkg/runtime"
|
"k8s.io/kubernetes/pkg/runtime"
|
||||||
|
"k8s.io/kubernetes/pkg/types"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
var node *api.Node
|
var node *api.Node
|
||||||
@ -477,6 +482,8 @@ func TestDrain(t *testing.T) {
|
|||||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &job)}, nil
|
||||||
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
|
case m.isFor("GET", "/namespaces/default/replicasets/rs"):
|
||||||
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
|
return &http.Response{StatusCode: 200, Header: defaultHeader(), Body: objBody(testapi.Extensions.Codec(), &test.replicaSets[0])}, nil
|
||||||
|
case m.isFor("GET", "/namespaces/default/pods/bar"):
|
||||||
|
return &http.Response{StatusCode: 404, Header: defaultHeader(), Body: objBody(codec, nil)}, nil
|
||||||
case m.isFor("GET", "/pods"):
|
case m.isFor("GET", "/pods"):
|
||||||
values, err := url.ParseQuery(req.URL.RawQuery)
|
values, err := url.ParseQuery(req.URL.RawQuery)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -549,6 +556,122 @@ func TestDrain(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestDeletePods(t *testing.T) {
|
||||||
|
ifHasBeenCalled := map[string]bool{}
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
interval time.Duration
|
||||||
|
timeout time.Duration
|
||||||
|
expectPendingPods bool
|
||||||
|
expectError bool
|
||||||
|
expectedError *error
|
||||||
|
getPodFn func(namespace, name string) (*api.Pod, error)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "Wait for deleting to complete",
|
||||||
|
interval: 100 * time.Millisecond,
|
||||||
|
timeout: 10 * time.Second,
|
||||||
|
expectPendingPods: false,
|
||||||
|
expectError: false,
|
||||||
|
expectedError: nil,
|
||||||
|
getPodFn: func(namespace, name string) (*api.Pod, error) {
|
||||||
|
oldPodMap, _ := createPods(false)
|
||||||
|
newPodMap, _ := createPods(true)
|
||||||
|
if oldPod, found := oldPodMap[name]; found {
|
||||||
|
if _, ok := ifHasBeenCalled[name]; !ok {
|
||||||
|
ifHasBeenCalled[name] = true
|
||||||
|
return &oldPod, nil
|
||||||
|
} else {
|
||||||
|
if oldPod.ObjectMeta.Generation < 4 {
|
||||||
|
newPod := newPodMap[name]
|
||||||
|
return &newPod, nil
|
||||||
|
} else {
|
||||||
|
return nil, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil, apierrors.NewNotFound(unversioned.GroupResource{Resource: "pods"}, name)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Deleting could timeout",
|
||||||
|
interval: 200 * time.Millisecond,
|
||||||
|
timeout: 3 * time.Second,
|
||||||
|
expectPendingPods: true,
|
||||||
|
expectError: true,
|
||||||
|
expectedError: &wait.ErrWaitTimeout,
|
||||||
|
getPodFn: func(namespace, name string) (*api.Pod, error) {
|
||||||
|
oldPodMap, _ := createPods(false)
|
||||||
|
if oldPod, found := oldPodMap[name]; found {
|
||||||
|
return &oldPod, nil
|
||||||
|
}
|
||||||
|
return nil, errors.New(fmt.Sprintf("%q: not found", name))
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Client error could be passed out",
|
||||||
|
interval: 200 * time.Millisecond,
|
||||||
|
timeout: 5 * time.Second,
|
||||||
|
expectPendingPods: true,
|
||||||
|
expectError: true,
|
||||||
|
expectedError: nil,
|
||||||
|
getPodFn: func(namespace, name string) (*api.Pod, error) {
|
||||||
|
return nil, errors.New("This is a random error for testing")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
o := DrainOptions{}
|
||||||
|
o.ifPrint = false
|
||||||
|
for _, test := range tests {
|
||||||
|
_, pods := createPods(false)
|
||||||
|
pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, test.getPodFn)
|
||||||
|
|
||||||
|
if test.expectError {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("%s: unexpected non-error", test.description)
|
||||||
|
} else if test.expectedError != nil {
|
||||||
|
if *test.expectedError != err {
|
||||||
|
t.Fatalf("%s: the error does not match expected error", test.description)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !test.expectError && err != nil {
|
||||||
|
t.Fatalf("%s: unexpected error", test.description)
|
||||||
|
}
|
||||||
|
if test.expectPendingPods && len(pendingPods) == 0 {
|
||||||
|
t.Fatalf("%s: unexpected empty pods", test.description)
|
||||||
|
}
|
||||||
|
if !test.expectPendingPods && len(pendingPods) > 0 {
|
||||||
|
t.Fatalf("%s: unexpected pending pods", test.description)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPods(ifCreateNewPods bool) (map[string]api.Pod, []api.Pod) {
|
||||||
|
podMap := make(map[string]api.Pod)
|
||||||
|
podSlice := []api.Pod{}
|
||||||
|
for i := 0; i < 8; i++ {
|
||||||
|
var uid types.UID
|
||||||
|
if ifCreateNewPods {
|
||||||
|
uid = types.UID(i)
|
||||||
|
} else {
|
||||||
|
uid = types.UID(string(i) + string(i))
|
||||||
|
}
|
||||||
|
pod := api.Pod{
|
||||||
|
ObjectMeta: api.ObjectMeta{
|
||||||
|
Name: "pod" + string(i),
|
||||||
|
Namespace: "default",
|
||||||
|
UID: uid,
|
||||||
|
Generation: int64(i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
podMap[pod.Name] = pod
|
||||||
|
podSlice = append(podSlice, pod)
|
||||||
|
}
|
||||||
|
return podMap, podSlice
|
||||||
|
}
|
||||||
|
|
||||||
type MyReq struct {
|
type MyReq struct {
|
||||||
Request *http.Request
|
Request *http.Request
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user