mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-28 22:17:14 +00:00
Merge pull request #80045 from justinsb/drain_more_reusable
Make drain library more reusable
This commit is contained in:
commit
b97d08fbe0
@ -8,13 +8,10 @@ go_library(
|
|||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
|
||||||
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
|
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
|
||||||
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
|
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
|
||||||
"//staging/src/k8s.io/cli-runtime/pkg/resource:go_default_library",
|
"//staging/src/k8s.io/cli-runtime/pkg/resource:go_default_library",
|
||||||
@ -36,15 +33,11 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/batch/v1:go_default_library",
|
"//staging/src/k8s.io/api/batch/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
|
||||||
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
|
"//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library",
|
||||||
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
|
|
||||||
"//staging/src/k8s.io/client-go/rest/fake:go_default_library",
|
"//staging/src/k8s.io/client-go/rest/fake:go_default_library",
|
||||||
"//staging/src/k8s.io/kubectl/pkg/cmd/testing:go_default_library",
|
"//staging/src/k8s.io/kubectl/pkg/cmd/testing:go_default_library",
|
||||||
"//staging/src/k8s.io/kubectl/pkg/cmd/util:go_default_library",
|
"//staging/src/k8s.io/kubectl/pkg/cmd/util:go_default_library",
|
||||||
|
@ -19,19 +19,14 @@ package drain
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
|
|
||||||
"k8s.io/cli-runtime/pkg/genericclioptions"
|
"k8s.io/cli-runtime/pkg/genericclioptions"
|
||||||
"k8s.io/cli-runtime/pkg/printers"
|
"k8s.io/cli-runtime/pkg/printers"
|
||||||
@ -146,14 +141,34 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions {
|
func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions {
|
||||||
return &DrainCmdOptions{
|
o := &DrainCmdOptions{
|
||||||
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
|
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
|
||||||
IOStreams: ioStreams,
|
IOStreams: ioStreams,
|
||||||
drainer: &drain.Helper{
|
drainer: &drain.Helper{
|
||||||
GracePeriodSeconds: -1,
|
GracePeriodSeconds: -1,
|
||||||
|
Out: ioStreams.Out,
|
||||||
ErrOut: ioStreams.ErrOut,
|
ErrOut: ioStreams.ErrOut,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
o.drainer.OnPodDeletedOrEvicted = o.onPodDeletedOrEvicted
|
||||||
|
return o
|
||||||
|
}
|
||||||
|
|
||||||
|
// onPodDeletedOrEvicted is called by drain.Helper, when the pod has been deleted or evicted
|
||||||
|
func (o *DrainCmdOptions) onPodDeletedOrEvicted(pod *corev1.Pod, usingEviction bool) {
|
||||||
|
var verbStr string
|
||||||
|
if usingEviction {
|
||||||
|
verbStr = "evicted"
|
||||||
|
} else {
|
||||||
|
verbStr = "deleted"
|
||||||
|
}
|
||||||
|
printObj, err := o.ToPrinter(verbStr)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(o.ErrOut, "error building printer: %v\n", err)
|
||||||
|
fmt.Fprintf(o.Out, "pod %s/%s %s\n", pod.Namespace, pod.Name, verbStr)
|
||||||
|
} else {
|
||||||
|
printObj(pod, o.Out)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
|
func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command {
|
||||||
@ -313,7 +328,7 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error
|
|||||||
fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings)
|
fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := o.deleteOrEvictPods(list.Pods()); err != nil {
|
if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil {
|
||||||
pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
|
pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name)
|
||||||
|
|
||||||
fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
|
fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err)
|
||||||
@ -328,136 +343,6 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// deleteOrEvictPods deletes or evicts the pods on the api server
|
|
||||||
func (o *DrainCmdOptions) deleteOrEvictPods(pods []corev1.Pod) error {
|
|
||||||
if len(pods) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
policyGroupVersion, err := drain.CheckEvictionSupport(o.drainer.Client)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
|
|
||||||
return o.drainer.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(policyGroupVersion) > 0 {
|
|
||||||
return o.evictPods(pods, policyGroupVersion, getPodFn)
|
|
||||||
} else {
|
|
||||||
return o.deletePods(pods, getPodFn)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *DrainCmdOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
|
|
||||||
returnCh := make(chan error, 1)
|
|
||||||
|
|
||||||
for _, pod := range pods {
|
|
||||||
go func(pod corev1.Pod, returnCh chan error) {
|
|
||||||
for {
|
|
||||||
fmt.Fprintf(o.Out, "evicting pod %q\n", pod.Name)
|
|
||||||
err := o.drainer.EvictPod(pod, policyGroupVersion)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
} else if apierrors.IsNotFound(err) {
|
|
||||||
returnCh <- nil
|
|
||||||
return
|
|
||||||
} else if apierrors.IsTooManyRequests(err) {
|
|
||||||
fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
|
|
||||||
time.Sleep(5 * time.Second)
|
|
||||||
} else {
|
|
||||||
returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := o.waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn)
|
|
||||||
if err == nil {
|
|
||||||
returnCh <- nil
|
|
||||||
} else {
|
|
||||||
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
|
|
||||||
}
|
|
||||||
}(pod, returnCh)
|
|
||||||
}
|
|
||||||
|
|
||||||
doneCount := 0
|
|
||||||
var errors []error
|
|
||||||
|
|
||||||
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
|
||||||
var globalTimeout time.Duration
|
|
||||||
if o.drainer.Timeout == 0 {
|
|
||||||
globalTimeout = time.Duration(math.MaxInt64)
|
|
||||||
} else {
|
|
||||||
globalTimeout = o.drainer.Timeout
|
|
||||||
}
|
|
||||||
globalTimeoutCh := time.After(globalTimeout)
|
|
||||||
numPods := len(pods)
|
|
||||||
for doneCount < numPods {
|
|
||||||
select {
|
|
||||||
case err := <-returnCh:
|
|
||||||
doneCount++
|
|
||||||
if err != nil {
|
|
||||||
errors = append(errors, err)
|
|
||||||
}
|
|
||||||
case <-globalTimeoutCh:
|
|
||||||
return fmt.Errorf("drain did not complete within %v", globalTimeout)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return utilerrors.NewAggregate(errors)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *DrainCmdOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
|
|
||||||
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
|
||||||
var globalTimeout time.Duration
|
|
||||||
if o.drainer.Timeout == 0 {
|
|
||||||
globalTimeout = time.Duration(math.MaxInt64)
|
|
||||||
} else {
|
|
||||||
globalTimeout = o.drainer.Timeout
|
|
||||||
}
|
|
||||||
for _, pod := range pods {
|
|
||||||
err := o.drainer.DeletePod(pod)
|
|
||||||
if err != nil && !apierrors.IsNotFound(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
_, err := o.waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (o *DrainCmdOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) {
|
|
||||||
var verbStr string
|
|
||||||
if usingEviction {
|
|
||||||
verbStr = "evicted"
|
|
||||||
} else {
|
|
||||||
verbStr = "deleted"
|
|
||||||
}
|
|
||||||
printObj, err := o.ToPrinter(verbStr)
|
|
||||||
if err != nil {
|
|
||||||
return pods, err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = wait.PollImmediate(interval, timeout, func() (bool, error) {
|
|
||||||
pendingPods := []corev1.Pod{}
|
|
||||||
for i, pod := range pods {
|
|
||||||
p, err := getPodFn(pod.Namespace, pod.Name)
|
|
||||||
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
|
|
||||||
printObj(&pod, o.Out)
|
|
||||||
continue
|
|
||||||
} else if err != nil {
|
|
||||||
return false, err
|
|
||||||
} else {
|
|
||||||
pendingPods = append(pendingPods, pods[i])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pods = pendingPods
|
|
||||||
if len(pendingPods) > 0 {
|
|
||||||
return false, nil
|
|
||||||
}
|
|
||||||
return true, nil
|
|
||||||
})
|
|
||||||
return pods, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
// RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for
|
||||||
// "Unschedulable" is passed as the first arg.
|
// "Unschedulable" is passed as the first arg.
|
||||||
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
|
func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error {
|
||||||
|
@ -17,15 +17,11 @@ limitations under the License.
|
|||||||
package drain
|
package drain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
|
||||||
"fmt"
|
|
||||||
"io"
|
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"testing"
|
"testing"
|
||||||
@ -38,14 +34,10 @@ import (
|
|||||||
batchv1 "k8s.io/api/batch/v1"
|
batchv1 "k8s.io/api/batch/v1"
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
||||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
|
||||||
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
"k8s.io/apimachinery/pkg/util/strategicpatch"
|
||||||
"k8s.io/apimachinery/pkg/util/wait"
|
|
||||||
"k8s.io/cli-runtime/pkg/printers"
|
|
||||||
"k8s.io/client-go/rest/fake"
|
"k8s.io/client-go/rest/fake"
|
||||||
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
|
cmdtesting "k8s.io/kubectl/pkg/cmd/testing"
|
||||||
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
cmdutil "k8s.io/kubectl/pkg/cmd/util"
|
||||||
@ -907,135 +899,6 @@ func TestDrain(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDeletePods(t *testing.T) {
|
|
||||||
ifHasBeenCalled := map[string]bool{}
|
|
||||||
tests := []struct {
|
|
||||||
description string
|
|
||||||
interval time.Duration
|
|
||||||
timeout time.Duration
|
|
||||||
expectPendingPods bool
|
|
||||||
expectError bool
|
|
||||||
expectedError *error
|
|
||||||
getPodFn func(namespace, name string) (*corev1.Pod, error)
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
description: "Wait for deleting to complete",
|
|
||||||
interval: 100 * time.Millisecond,
|
|
||||||
timeout: 10 * time.Second,
|
|
||||||
expectPendingPods: false,
|
|
||||||
expectError: false,
|
|
||||||
expectedError: nil,
|
|
||||||
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
|
|
||||||
oldPodMap, _ := createPods(false)
|
|
||||||
newPodMap, _ := createPods(true)
|
|
||||||
if oldPod, found := oldPodMap[name]; found {
|
|
||||||
if _, ok := ifHasBeenCalled[name]; !ok {
|
|
||||||
ifHasBeenCalled[name] = true
|
|
||||||
return &oldPod, nil
|
|
||||||
}
|
|
||||||
if oldPod.ObjectMeta.Generation < 4 {
|
|
||||||
newPod := newPodMap[name]
|
|
||||||
return &newPod, nil
|
|
||||||
}
|
|
||||||
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
|
|
||||||
|
|
||||||
}
|
|
||||||
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
description: "Deleting could timeout",
|
|
||||||
interval: 200 * time.Millisecond,
|
|
||||||
timeout: 3 * time.Second,
|
|
||||||
expectPendingPods: true,
|
|
||||||
expectError: true,
|
|
||||||
expectedError: &wait.ErrWaitTimeout,
|
|
||||||
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
|
|
||||||
oldPodMap, _ := createPods(false)
|
|
||||||
if oldPod, found := oldPodMap[name]; found {
|
|
||||||
return &oldPod, nil
|
|
||||||
}
|
|
||||||
return nil, fmt.Errorf("%q: not found", name)
|
|
||||||
},
|
|
||||||
},
|
|
||||||
{
|
|
||||||
description: "Client error could be passed out",
|
|
||||||
interval: 200 * time.Millisecond,
|
|
||||||
timeout: 5 * time.Second,
|
|
||||||
expectPendingPods: true,
|
|
||||||
expectError: true,
|
|
||||||
expectedError: nil,
|
|
||||||
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
|
|
||||||
return nil, errors.New("This is a random error for testing")
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, test := range tests {
|
|
||||||
t.Run(test.description, func(t *testing.T) {
|
|
||||||
tf := cmdtesting.NewTestFactory()
|
|
||||||
defer tf.Cleanup()
|
|
||||||
|
|
||||||
o := DrainCmdOptions{
|
|
||||||
PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme),
|
|
||||||
}
|
|
||||||
o.Out = os.Stdout
|
|
||||||
|
|
||||||
o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) {
|
|
||||||
return func(obj runtime.Object, out io.Writer) error {
|
|
||||||
return nil
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
_, pods := createPods(false)
|
|
||||||
pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn)
|
|
||||||
|
|
||||||
if test.expectError {
|
|
||||||
if err == nil {
|
|
||||||
t.Fatalf("%s: unexpected non-error", test.description)
|
|
||||||
} else if test.expectedError != nil {
|
|
||||||
if *test.expectedError != err {
|
|
||||||
t.Fatalf("%s: the error does not match expected error", test.description)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !test.expectError && err != nil {
|
|
||||||
t.Fatalf("%s: unexpected error", test.description)
|
|
||||||
}
|
|
||||||
if test.expectPendingPods && len(pendingPods) == 0 {
|
|
||||||
t.Fatalf("%s: unexpected empty pods", test.description)
|
|
||||||
}
|
|
||||||
if !test.expectPendingPods && len(pendingPods) > 0 {
|
|
||||||
t.Fatalf("%s: unexpected pending pods", test.description)
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) {
|
|
||||||
podMap := make(map[string]corev1.Pod)
|
|
||||||
podSlice := []corev1.Pod{}
|
|
||||||
for i := 0; i < 8; i++ {
|
|
||||||
var uid types.UID
|
|
||||||
if ifCreateNewPods {
|
|
||||||
uid = types.UID(i)
|
|
||||||
} else {
|
|
||||||
uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i))
|
|
||||||
}
|
|
||||||
pod := corev1.Pod{
|
|
||||||
ObjectMeta: metav1.ObjectMeta{
|
|
||||||
Name: "pod" + strconv.Itoa(i),
|
|
||||||
Namespace: "default",
|
|
||||||
UID: uid,
|
|
||||||
Generation: int64(i),
|
|
||||||
},
|
|
||||||
}
|
|
||||||
podMap[pod.Name] = pod
|
|
||||||
podSlice = append(podSlice, pod)
|
|
||||||
}
|
|
||||||
return podMap, podSlice
|
|
||||||
}
|
|
||||||
|
|
||||||
type MyReq struct {
|
type MyReq struct {
|
||||||
Request *http.Request
|
Request *http.Request
|
||||||
}
|
}
|
||||||
|
@ -1,9 +1,10 @@
|
|||||||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
|
||||||
|
|
||||||
go_library(
|
go_library(
|
||||||
name = "go_default_library",
|
name = "go_default_library",
|
||||||
srcs = [
|
srcs = [
|
||||||
"cordon.go",
|
"cordon.go",
|
||||||
|
"default.go",
|
||||||
"drain.go",
|
"drain.go",
|
||||||
"filters.go",
|
"filters.go",
|
||||||
],
|
],
|
||||||
@ -11,6 +12,7 @@ go_library(
|
|||||||
importpath = "k8s.io/kubectl/pkg/drain",
|
importpath = "k8s.io/kubectl/pkg/drain",
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
deps = [
|
deps = [
|
||||||
|
# Please be wary of additional deps here ... this is intended to be usable as a library
|
||||||
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
"//staging/src/k8s.io/api/apps/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
@ -21,8 +23,10 @@ go_library(
|
|||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
@ -40,3 +44,21 @@ filegroup(
|
|||||||
tags = ["automanaged"],
|
tags = ["automanaged"],
|
||||||
visibility = ["//visibility:public"],
|
visibility = ["//visibility:public"],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
go_test(
|
||||||
|
name = "go_default_test",
|
||||||
|
srcs = ["drain_test.go"],
|
||||||
|
embed = [":go_default_library"],
|
||||||
|
deps = [
|
||||||
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/api/policy/v1beta1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
|
||||||
|
"//staging/src/k8s.io/client-go/testing:go_default_library",
|
||||||
|
],
|
||||||
|
)
|
||||||
|
69
staging/src/k8s.io/kubectl/pkg/drain/default.go
Normal file
69
staging/src/k8s.io/kubectl/pkg/drain/default.go
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2019 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package drain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
)
|
||||||
|
|
||||||
|
// This file contains default implementations of how to
|
||||||
|
// drain/cordon/uncordon nodes. These functions may be called
|
||||||
|
// directly, or their functionality copied into your own code, for
|
||||||
|
// example if you want different output behaviour.
|
||||||
|
|
||||||
|
// RunNodeDrain shows the canonical way to drain a node.
|
||||||
|
// You should first cordon the node, e.g. using RunCordonOrUncordon
|
||||||
|
func RunNodeDrain(drainer *Helper, nodeName string) error {
|
||||||
|
// TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers
|
||||||
|
list, errs := drainer.GetPodsForDeletion(nodeName)
|
||||||
|
if errs != nil {
|
||||||
|
return utilerrors.NewAggregate(errs)
|
||||||
|
}
|
||||||
|
if warnings := list.Warnings(); warnings != "" {
|
||||||
|
fmt.Fprintf(drainer.ErrOut, "WARNING: %s\n", warnings)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := drainer.DeleteOrEvictPods(list.Pods()); err != nil {
|
||||||
|
// Maybe warn about non-deleted pods here
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunCordonOrUncordon demonstrates the canonical way to cordon or uncordon a Node
|
||||||
|
func RunCordonOrUncordon(drainer *Helper, node *corev1.Node, desired bool) error {
|
||||||
|
// TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers
|
||||||
|
c := NewCordonHelper(node)
|
||||||
|
|
||||||
|
if updateRequired := c.UpdateIfRequired(desired); !updateRequired {
|
||||||
|
// Already done
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
err, patchErr := c.PatchOrReplace(drainer.Client)
|
||||||
|
if patchErr != nil {
|
||||||
|
return patchErr
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
@ -17,14 +17,19 @@ limitations under the License.
|
|||||||
package drain
|
package drain
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"math"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
corev1 "k8s.io/api/core/v1"
|
corev1 "k8s.io/api/core/v1"
|
||||||
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/fields"
|
"k8s.io/apimachinery/pkg/fields"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
|
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
"k8s.io/client-go/kubernetes"
|
"k8s.io/client-go/kubernetes"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -39,14 +44,20 @@ const (
|
|||||||
type Helper struct {
|
type Helper struct {
|
||||||
Client kubernetes.Interface
|
Client kubernetes.Interface
|
||||||
Force bool
|
Force bool
|
||||||
DryRun bool
|
|
||||||
GracePeriodSeconds int
|
GracePeriodSeconds int
|
||||||
IgnoreAllDaemonSets bool
|
IgnoreAllDaemonSets bool
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
DeleteLocalData bool
|
DeleteLocalData bool
|
||||||
Selector string
|
Selector string
|
||||||
PodSelector string
|
PodSelector string
|
||||||
|
Out io.Writer
|
||||||
ErrOut io.Writer
|
ErrOut io.Writer
|
||||||
|
|
||||||
|
// TODO(justinsb): unnecessary?
|
||||||
|
DryRun bool
|
||||||
|
|
||||||
|
// OnPodDeletedOrEvicted is called when a pod is evicted/deleted; for printing progress output
|
||||||
|
OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CheckEvictionSupport uses Discovery API to find out if the server support
|
// CheckEvictionSupport uses Discovery API to find out if the server support
|
||||||
@ -157,3 +168,125 @@ func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) {
|
|||||||
|
|
||||||
return list, nil
|
return list, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DeleteOrEvictPods deletes or evicts the pods on the api server
|
||||||
|
func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error {
|
||||||
|
if len(pods) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
policyGroupVersion, err := CheckEvictionSupport(d.Client)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(justinsb): unnecessary?
|
||||||
|
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
|
||||||
|
return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{})
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(policyGroupVersion) > 0 {
|
||||||
|
return d.evictPods(pods, policyGroupVersion, getPodFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
return d.deletePods(pods, getPodFn)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
|
||||||
|
returnCh := make(chan error, 1)
|
||||||
|
|
||||||
|
for _, pod := range pods {
|
||||||
|
go func(pod corev1.Pod, returnCh chan error) {
|
||||||
|
for {
|
||||||
|
fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name)
|
||||||
|
err := d.EvictPod(pod, policyGroupVersion)
|
||||||
|
if err == nil {
|
||||||
|
break
|
||||||
|
} else if apierrors.IsNotFound(err) {
|
||||||
|
returnCh <- nil
|
||||||
|
return
|
||||||
|
} else if apierrors.IsTooManyRequests(err) {
|
||||||
|
fmt.Fprintf(d.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
} else {
|
||||||
|
returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted)
|
||||||
|
if err == nil {
|
||||||
|
returnCh <- nil
|
||||||
|
} else {
|
||||||
|
returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
|
||||||
|
}
|
||||||
|
}(pod, returnCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
doneCount := 0
|
||||||
|
var errors []error
|
||||||
|
|
||||||
|
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
||||||
|
var globalTimeout time.Duration
|
||||||
|
if d.Timeout == 0 {
|
||||||
|
globalTimeout = time.Duration(math.MaxInt64)
|
||||||
|
} else {
|
||||||
|
globalTimeout = d.Timeout
|
||||||
|
}
|
||||||
|
globalTimeoutCh := time.After(globalTimeout)
|
||||||
|
numPods := len(pods)
|
||||||
|
for doneCount < numPods {
|
||||||
|
select {
|
||||||
|
case err := <-returnCh:
|
||||||
|
doneCount++
|
||||||
|
if err != nil {
|
||||||
|
errors = append(errors, err)
|
||||||
|
}
|
||||||
|
case <-globalTimeoutCh:
|
||||||
|
return fmt.Errorf("drain did not complete within %v", globalTimeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return utilerrors.NewAggregate(errors)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
|
||||||
|
// 0 timeout means infinite, we use MaxInt64 to represent it.
|
||||||
|
var globalTimeout time.Duration
|
||||||
|
if d.Timeout == 0 {
|
||||||
|
globalTimeout = time.Duration(math.MaxInt64)
|
||||||
|
} else {
|
||||||
|
globalTimeout = d.Timeout
|
||||||
|
}
|
||||||
|
for _, pod := range pods {
|
||||||
|
err := d.DeletePod(pod)
|
||||||
|
if err != nil && !apierrors.IsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) {
|
||||||
|
err := wait.PollImmediate(interval, timeout, func() (bool, error) {
|
||||||
|
pendingPods := []corev1.Pod{}
|
||||||
|
for i, pod := range pods {
|
||||||
|
p, err := getPodFn(pod.Namespace, pod.Name)
|
||||||
|
if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) {
|
||||||
|
if onDoneFn != nil {
|
||||||
|
onDoneFn(&pod, usingEviction)
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
} else if err != nil {
|
||||||
|
return false, err
|
||||||
|
} else {
|
||||||
|
pendingPods = append(pendingPods, pods[i])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pods = pendingPods
|
||||||
|
if len(pendingPods) > 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
return pods, err
|
||||||
|
}
|
||||||
|
306
staging/src/k8s.io/kubectl/pkg/drain/drain_test.go
Normal file
306
staging/src/k8s.io/kubectl/pkg/drain/drain_test.go
Normal file
@ -0,0 +1,306 @@
|
|||||||
|
/*
|
||||||
|
Copyright 2015 The Kubernetes Authors.
|
||||||
|
|
||||||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
you may not use this file except in compliance with the License.
|
||||||
|
You may obtain a copy of the License at
|
||||||
|
|
||||||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
|
||||||
|
Unless required by applicable law or agreed to in writing, software
|
||||||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
See the License for the specific language governing permissions and
|
||||||
|
limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package drain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
corev1 "k8s.io/api/core/v1"
|
||||||
|
policyv1beta1 "k8s.io/api/policy/v1beta1"
|
||||||
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
|
"k8s.io/apimachinery/pkg/types"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/client-go/kubernetes/fake"
|
||||||
|
ktest "k8s.io/client-go/testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestDeletePods(t *testing.T) {
|
||||||
|
ifHasBeenCalled := map[string]bool{}
|
||||||
|
tests := []struct {
|
||||||
|
description string
|
||||||
|
interval time.Duration
|
||||||
|
timeout time.Duration
|
||||||
|
expectPendingPods bool
|
||||||
|
expectError bool
|
||||||
|
expectedError *error
|
||||||
|
getPodFn func(namespace, name string) (*corev1.Pod, error)
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
description: "Wait for deleting to complete",
|
||||||
|
interval: 100 * time.Millisecond,
|
||||||
|
timeout: 10 * time.Second,
|
||||||
|
expectPendingPods: false,
|
||||||
|
expectError: false,
|
||||||
|
expectedError: nil,
|
||||||
|
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
|
||||||
|
oldPodMap, _ := createPods(false)
|
||||||
|
newPodMap, _ := createPods(true)
|
||||||
|
if oldPod, found := oldPodMap[name]; found {
|
||||||
|
if _, ok := ifHasBeenCalled[name]; !ok {
|
||||||
|
ifHasBeenCalled[name] = true
|
||||||
|
return &oldPod, nil
|
||||||
|
}
|
||||||
|
if oldPod.ObjectMeta.Generation < 4 {
|
||||||
|
newPod := newPodMap[name]
|
||||||
|
return &newPod, nil
|
||||||
|
}
|
||||||
|
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
|
||||||
|
|
||||||
|
}
|
||||||
|
return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Deleting could timeout",
|
||||||
|
interval: 200 * time.Millisecond,
|
||||||
|
timeout: 3 * time.Second,
|
||||||
|
expectPendingPods: true,
|
||||||
|
expectError: true,
|
||||||
|
expectedError: &wait.ErrWaitTimeout,
|
||||||
|
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
|
||||||
|
oldPodMap, _ := createPods(false)
|
||||||
|
if oldPod, found := oldPodMap[name]; found {
|
||||||
|
return &oldPod, nil
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("%q: not found", name)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
description: "Client error could be passed out",
|
||||||
|
interval: 200 * time.Millisecond,
|
||||||
|
timeout: 5 * time.Second,
|
||||||
|
expectPendingPods: true,
|
||||||
|
expectError: true,
|
||||||
|
expectedError: nil,
|
||||||
|
getPodFn: func(namespace, name string) (*corev1.Pod, error) {
|
||||||
|
return nil, errors.New("This is a random error for testing")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, test := range tests {
|
||||||
|
t.Run(test.description, func(t *testing.T) {
|
||||||
|
_, pods := createPods(false)
|
||||||
|
pendingPods, err := waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn, nil)
|
||||||
|
|
||||||
|
if test.expectError {
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("%s: unexpected non-error", test.description)
|
||||||
|
} else if test.expectedError != nil {
|
||||||
|
if *test.expectedError != err {
|
||||||
|
t.Fatalf("%s: the error does not match expected error", test.description)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !test.expectError && err != nil {
|
||||||
|
t.Fatalf("%s: unexpected error", test.description)
|
||||||
|
}
|
||||||
|
if test.expectPendingPods && len(pendingPods) == 0 {
|
||||||
|
t.Fatalf("%s: unexpected empty pods", test.description)
|
||||||
|
}
|
||||||
|
if !test.expectPendingPods && len(pendingPods) > 0 {
|
||||||
|
t.Fatalf("%s: unexpected pending pods", test.description)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) {
|
||||||
|
podMap := make(map[string]corev1.Pod)
|
||||||
|
podSlice := []corev1.Pod{}
|
||||||
|
for i := 0; i < 8; i++ {
|
||||||
|
var uid types.UID
|
||||||
|
if ifCreateNewPods {
|
||||||
|
uid = types.UID(i)
|
||||||
|
} else {
|
||||||
|
uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i))
|
||||||
|
}
|
||||||
|
pod := corev1.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: "pod" + strconv.Itoa(i),
|
||||||
|
Namespace: "default",
|
||||||
|
UID: uid,
|
||||||
|
Generation: int64(i),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
podMap[pod.Name] = pod
|
||||||
|
podSlice = append(podSlice, pod)
|
||||||
|
}
|
||||||
|
return podMap, podSlice
|
||||||
|
}
|
||||||
|
|
||||||
|
// addEvictionSupport implements simple fake eviction support on the fake.Clientset
|
||||||
|
func addEvictionSupport(t *testing.T, k *fake.Clientset) {
|
||||||
|
podsEviction := metav1.APIResource{
|
||||||
|
Name: "pods/eviction",
|
||||||
|
Kind: "Eviction",
|
||||||
|
Group: "",
|
||||||
|
Version: "v1",
|
||||||
|
}
|
||||||
|
coreResources := &metav1.APIResourceList{
|
||||||
|
GroupVersion: "v1",
|
||||||
|
APIResources: []metav1.APIResource{podsEviction},
|
||||||
|
}
|
||||||
|
|
||||||
|
policyResources := &metav1.APIResourceList{
|
||||||
|
GroupVersion: "policy/v1",
|
||||||
|
}
|
||||||
|
k.Resources = append(k.Resources, coreResources, policyResources)
|
||||||
|
|
||||||
|
// Delete pods when evict is called
|
||||||
|
k.PrependReactor("create", "pods", func(action ktest.Action) (bool, runtime.Object, error) {
|
||||||
|
if action.GetSubresource() != "eviction" {
|
||||||
|
return false, nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eviction := *action.(ktest.CreateAction).GetObject().(*policyv1beta1.Eviction)
|
||||||
|
// Avoid the lock
|
||||||
|
go func() {
|
||||||
|
err := k.CoreV1().Pods(eviction.Namespace).Delete(eviction.Name, &metav1.DeleteOptions{})
|
||||||
|
if err != nil {
|
||||||
|
// Errorf because we can't call Fatalf from another goroutine
|
||||||
|
t.Errorf("failed to delete pod: %s/%s", eviction.Namespace, eviction.Name)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return true, nil, nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCheckEvictionSupport(t *testing.T) {
|
||||||
|
for _, evictionSupported := range []bool{true, false} {
|
||||||
|
evictionSupported := evictionSupported
|
||||||
|
t.Run(fmt.Sprintf("evictionSupported=%v", evictionSupported),
|
||||||
|
func(t *testing.T) {
|
||||||
|
k := fake.NewSimpleClientset()
|
||||||
|
if evictionSupported {
|
||||||
|
addEvictionSupport(t, k)
|
||||||
|
}
|
||||||
|
|
||||||
|
apiGroup, err := CheckEvictionSupport(k)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected error: %v", err)
|
||||||
|
}
|
||||||
|
expectedAPIGroup := ""
|
||||||
|
if evictionSupported {
|
||||||
|
expectedAPIGroup = "policy/v1"
|
||||||
|
}
|
||||||
|
if apiGroup != expectedAPIGroup {
|
||||||
|
t.Fatalf("expected apigroup %q, actual=%q", expectedAPIGroup, apiGroup)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeleteOrEvict(t *testing.T) {
|
||||||
|
for _, evictionSupported := range []bool{true, false} {
|
||||||
|
evictionSupported := evictionSupported
|
||||||
|
t.Run(fmt.Sprintf("evictionSupported=%v", evictionSupported),
|
||||||
|
func(t *testing.T) {
|
||||||
|
h := &Helper{
|
||||||
|
Out: os.Stdout,
|
||||||
|
GracePeriodSeconds: 10,
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create 4 pods, and try to remove the first 2
|
||||||
|
var expectedEvictions []policyv1beta1.Eviction
|
||||||
|
var create []runtime.Object
|
||||||
|
deletePods := []corev1.Pod{}
|
||||||
|
for i := 1; i <= 4; i++ {
|
||||||
|
pod := &corev1.Pod{}
|
||||||
|
pod.Name = fmt.Sprintf("mypod-%d", i)
|
||||||
|
pod.Namespace = "default"
|
||||||
|
|
||||||
|
create = append(create, pod)
|
||||||
|
if i <= 2 {
|
||||||
|
deletePods = append(deletePods, *pod)
|
||||||
|
|
||||||
|
if evictionSupported {
|
||||||
|
eviction := policyv1beta1.Eviction{}
|
||||||
|
eviction.Kind = "Eviction"
|
||||||
|
eviction.APIVersion = "policy/v1"
|
||||||
|
eviction.Namespace = pod.Namespace
|
||||||
|
eviction.Name = pod.Name
|
||||||
|
|
||||||
|
gracePeriodSeconds := int64(h.GracePeriodSeconds)
|
||||||
|
eviction.DeleteOptions = &metav1.DeleteOptions{
|
||||||
|
GracePeriodSeconds: &gracePeriodSeconds,
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedEvictions = append(expectedEvictions, eviction)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the fake client
|
||||||
|
k := fake.NewSimpleClientset(create...)
|
||||||
|
if evictionSupported {
|
||||||
|
addEvictionSupport(t, k)
|
||||||
|
}
|
||||||
|
h.Client = k
|
||||||
|
|
||||||
|
// Do the eviction
|
||||||
|
if err := h.DeleteOrEvictPods(deletePods); err != nil {
|
||||||
|
t.Fatalf("error from DeleteOrEvictPods: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that other pods are still there
|
||||||
|
var remainingPods []string
|
||||||
|
{
|
||||||
|
podList, err := k.CoreV1().Pods("").List(metav1.ListOptions{})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("error listing pods: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
remainingPods = append(remainingPods, pod.Namespace+"/"+pod.Name)
|
||||||
|
}
|
||||||
|
sort.Strings(remainingPods)
|
||||||
|
}
|
||||||
|
expected := []string{"default/mypod-3", "default/mypod-4"}
|
||||||
|
if !reflect.DeepEqual(remainingPods, expected) {
|
||||||
|
t.Errorf("unexpected remaining pods after DeleteOrEvictPods; actual %v; expected %v", remainingPods, expected)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that pods were evicted as expected
|
||||||
|
var actualEvictions []policyv1beta1.Eviction
|
||||||
|
for _, action := range k.Actions() {
|
||||||
|
if action.GetVerb() != "create" || action.GetResource().Resource != "pods" || action.GetSubresource() != "eviction" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
eviction := *action.(ktest.CreateAction).GetObject().(*policyv1beta1.Eviction)
|
||||||
|
actualEvictions = append(actualEvictions, eviction)
|
||||||
|
}
|
||||||
|
sort.Slice(actualEvictions, func(i, j int) bool {
|
||||||
|
return actualEvictions[i].Name < actualEvictions[j].Name
|
||||||
|
})
|
||||||
|
if !reflect.DeepEqual(actualEvictions, expectedEvictions) {
|
||||||
|
t.Errorf("unexpected evictions; actual %v; expected %v", actualEvictions, expectedEvictions)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user