From c74f2f6a72bf6fe2718d1e386e4e08747f34fe7b Mon Sep 17 00:00:00 2001 From: Justin SB Date: Thu, 11 Jul 2019 13:28:25 -0400 Subject: [PATCH 1/2] Make drain library more reusable Move more functionality from the kubectl cmd to a package with fewer dependencies. --- .../src/k8s.io/kubectl/pkg/cmd/drain/BUILD | 7 - .../src/k8s.io/kubectl/pkg/cmd/drain/drain.go | 159 +++--------------- .../kubectl/pkg/cmd/drain/drain_test.go | 137 --------------- staging/src/k8s.io/kubectl/pkg/drain/BUILD | 20 ++- .../src/k8s.io/kubectl/pkg/drain/default.go | 67 ++++++++ staging/src/k8s.io/kubectl/pkg/drain/drain.go | 135 ++++++++++++++- .../k8s.io/kubectl/pkg/drain/drain_test.go | 147 ++++++++++++++++ 7 files changed, 389 insertions(+), 283 deletions(-) create mode 100644 staging/src/k8s.io/kubectl/pkg/drain/default.go create mode 100644 staging/src/k8s.io/kubectl/pkg/drain/drain_test.go diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/drain/BUILD b/staging/src/k8s.io/kubectl/pkg/cmd/drain/BUILD index 8dc7fbdf546..5ff1749dc5e 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/drain/BUILD +++ b/staging/src/k8s.io/kubectl/pkg/cmd/drain/BUILD @@ -8,13 +8,10 @@ go_library( visibility = ["//visibility:public"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/labels:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/resource:go_default_library", @@ -36,15 +33,11 @@ go_test( "//staging/src/k8s.io/api/batch/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", - "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/cli-runtime/pkg/genericclioptions:go_default_library", - "//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library", "//staging/src/k8s.io/client-go/rest/fake:go_default_library", "//staging/src/k8s.io/kubectl/pkg/cmd/testing:go_default_library", "//staging/src/k8s.io/kubectl/pkg/cmd/util:go_default_library", diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go b/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go index 985d6553d7a..1a66d5b99d9 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain.go @@ -19,19 +19,14 @@ package drain import ( "errors" "fmt" - "math" - "time" "github.com/spf13/cobra" corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/cli-runtime/pkg/printers" @@ -146,14 +141,34 @@ var ( ) func NewDrainCmdOptions(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *DrainCmdOptions { - return &DrainCmdOptions{ + o := &DrainCmdOptions{ PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme), IOStreams: ioStreams, drainer: &drain.Helper{ GracePeriodSeconds: -1, + Out: ioStreams.Out, ErrOut: ioStreams.ErrOut, }, } + o.drainer.OnPodDeletedOrEvicted = o.onPodDeletedOrEvicted + return o +} + +// onPodDeletedOrEvicted is called by drain.Helper, when the pod has been deleted or evicted +func (o *DrainCmdOptions) onPodDeletedOrEvicted(pod *corev1.Pod, usingEviction bool) { + var verbStr string + if usingEviction { + verbStr = "evicted" + } else { + verbStr = "deleted" + } + printObj, err := o.ToPrinter(verbStr) + if err != nil { + fmt.Fprintf(o.ErrOut, "error building printer: %v\n", err) + fmt.Fprintf(o.Out, "pod %s/%s %s\n", pod.Namespace, pod.Name, verbStr) + } else { + printObj(pod, o.Out) + } } func NewCmdDrain(f cmdutil.Factory, ioStreams genericclioptions.IOStreams) *cobra.Command { @@ -313,7 +328,7 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error fmt.Fprintf(o.ErrOut, "WARNING: %s\n", warnings) } - if err := o.deleteOrEvictPods(list.Pods()); err != nil { + if err := o.drainer.DeleteOrEvictPods(list.Pods()); err != nil { pendingList, newErrs := o.drainer.GetPodsForDeletion(nodeInfo.Name) fmt.Fprintf(o.ErrOut, "There are pending pods in node %q when an error occurred: %v\n", nodeInfo.Name, err) @@ -328,136 +343,6 @@ func (o *DrainCmdOptions) deleteOrEvictPodsSimple(nodeInfo *resource.Info) error return nil } -// deleteOrEvictPods deletes or evicts the pods on the api server -func (o *DrainCmdOptions) deleteOrEvictPods(pods []corev1.Pod) error { - if len(pods) == 0 { - return nil - } - - policyGroupVersion, err := drain.CheckEvictionSupport(o.drainer.Client) - if err != nil { - return err - } - - getPodFn := func(namespace, name string) (*corev1.Pod, error) { - return o.drainer.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) - } - - if len(policyGroupVersion) > 0 { - return o.evictPods(pods, policyGroupVersion, getPodFn) - } else { - return o.deletePods(pods, getPodFn) - } -} - -func (o *DrainCmdOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { - returnCh := make(chan error, 1) - - for _, pod := range pods { - go func(pod corev1.Pod, returnCh chan error) { - for { - fmt.Fprintf(o.Out, "evicting pod %q\n", pod.Name) - err := o.drainer.EvictPod(pod, policyGroupVersion) - if err == nil { - break - } else if apierrors.IsNotFound(err) { - returnCh <- nil - return - } else if apierrors.IsTooManyRequests(err) { - fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err) - time.Sleep(5 * time.Second) - } else { - returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) - return - } - } - _, err := o.waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn) - if err == nil { - returnCh <- nil - } else { - returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) - } - }(pod, returnCh) - } - - doneCount := 0 - var errors []error - - // 0 timeout means infinite, we use MaxInt64 to represent it. - var globalTimeout time.Duration - if o.drainer.Timeout == 0 { - globalTimeout = time.Duration(math.MaxInt64) - } else { - globalTimeout = o.drainer.Timeout - } - globalTimeoutCh := time.After(globalTimeout) - numPods := len(pods) - for doneCount < numPods { - select { - case err := <-returnCh: - doneCount++ - if err != nil { - errors = append(errors, err) - } - case <-globalTimeoutCh: - return fmt.Errorf("drain did not complete within %v", globalTimeout) - } - } - return utilerrors.NewAggregate(errors) -} - -func (o *DrainCmdOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { - // 0 timeout means infinite, we use MaxInt64 to represent it. - var globalTimeout time.Duration - if o.drainer.Timeout == 0 { - globalTimeout = time.Duration(math.MaxInt64) - } else { - globalTimeout = o.drainer.Timeout - } - for _, pod := range pods { - err := o.drainer.DeletePod(pod) - if err != nil && !apierrors.IsNotFound(err) { - return err - } - } - _, err := o.waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn) - return err -} - -func (o *DrainCmdOptions) waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error)) ([]corev1.Pod, error) { - var verbStr string - if usingEviction { - verbStr = "evicted" - } else { - verbStr = "deleted" - } - printObj, err := o.ToPrinter(verbStr) - if err != nil { - return pods, err - } - - err = wait.PollImmediate(interval, timeout, func() (bool, error) { - pendingPods := []corev1.Pod{} - for i, pod := range pods { - p, err := getPodFn(pod.Namespace, pod.Name) - if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { - printObj(&pod, o.Out) - continue - } else if err != nil { - return false, err - } else { - pendingPods = append(pendingPods, pods[i]) - } - } - pods = pendingPods - if len(pendingPods) > 0 { - return false, nil - } - return true, nil - }) - return pods, err -} - // RunCordonOrUncordon runs either Cordon or Uncordon. The desired value for // "Unschedulable" is passed as the first arg. func (o *DrainCmdOptions) RunCordonOrUncordon(desired bool) error { diff --git a/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain_test.go b/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain_test.go index 94e1e1ab9d9..1bea7c5a719 100644 --- a/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain_test.go +++ b/staging/src/k8s.io/kubectl/pkg/cmd/drain/drain_test.go @@ -17,15 +17,11 @@ limitations under the License. package drain import ( - "errors" - "fmt" - "io" "io/ioutil" "net/http" "net/url" "os" "reflect" - "strconv" "strings" "sync/atomic" "testing" @@ -38,14 +34,10 @@ import ( batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" - apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/strategicpatch" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/cli-runtime/pkg/printers" "k8s.io/client-go/rest/fake" cmdtesting "k8s.io/kubectl/pkg/cmd/testing" cmdutil "k8s.io/kubectl/pkg/cmd/util" @@ -907,135 +899,6 @@ func TestDrain(t *testing.T) { } } -func TestDeletePods(t *testing.T) { - ifHasBeenCalled := map[string]bool{} - tests := []struct { - description string - interval time.Duration - timeout time.Duration - expectPendingPods bool - expectError bool - expectedError *error - getPodFn func(namespace, name string) (*corev1.Pod, error) - }{ - { - description: "Wait for deleting to complete", - interval: 100 * time.Millisecond, - timeout: 10 * time.Second, - expectPendingPods: false, - expectError: false, - expectedError: nil, - getPodFn: func(namespace, name string) (*corev1.Pod, error) { - oldPodMap, _ := createPods(false) - newPodMap, _ := createPods(true) - if oldPod, found := oldPodMap[name]; found { - if _, ok := ifHasBeenCalled[name]; !ok { - ifHasBeenCalled[name] = true - return &oldPod, nil - } - if oldPod.ObjectMeta.Generation < 4 { - newPod := newPodMap[name] - return &newPod, nil - } - return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name) - - } - return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name) - }, - }, - { - description: "Deleting could timeout", - interval: 200 * time.Millisecond, - timeout: 3 * time.Second, - expectPendingPods: true, - expectError: true, - expectedError: &wait.ErrWaitTimeout, - getPodFn: func(namespace, name string) (*corev1.Pod, error) { - oldPodMap, _ := createPods(false) - if oldPod, found := oldPodMap[name]; found { - return &oldPod, nil - } - return nil, fmt.Errorf("%q: not found", name) - }, - }, - { - description: "Client error could be passed out", - interval: 200 * time.Millisecond, - timeout: 5 * time.Second, - expectPendingPods: true, - expectError: true, - expectedError: nil, - getPodFn: func(namespace, name string) (*corev1.Pod, error) { - return nil, errors.New("This is a random error for testing") - }, - }, - } - - for _, test := range tests { - t.Run(test.description, func(t *testing.T) { - tf := cmdtesting.NewTestFactory() - defer tf.Cleanup() - - o := DrainCmdOptions{ - PrintFlags: genericclioptions.NewPrintFlags("drained").WithTypeSetter(scheme.Scheme), - } - o.Out = os.Stdout - - o.ToPrinter = func(operation string) (printers.ResourcePrinterFunc, error) { - return func(obj runtime.Object, out io.Writer) error { - return nil - }, nil - } - - _, pods := createPods(false) - pendingPods, err := o.waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn) - - if test.expectError { - if err == nil { - t.Fatalf("%s: unexpected non-error", test.description) - } else if test.expectedError != nil { - if *test.expectedError != err { - t.Fatalf("%s: the error does not match expected error", test.description) - } - } - } - if !test.expectError && err != nil { - t.Fatalf("%s: unexpected error", test.description) - } - if test.expectPendingPods && len(pendingPods) == 0 { - t.Fatalf("%s: unexpected empty pods", test.description) - } - if !test.expectPendingPods && len(pendingPods) > 0 { - t.Fatalf("%s: unexpected pending pods", test.description) - } - }) - } -} - -func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) { - podMap := make(map[string]corev1.Pod) - podSlice := []corev1.Pod{} - for i := 0; i < 8; i++ { - var uid types.UID - if ifCreateNewPods { - uid = types.UID(i) - } else { - uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i)) - } - pod := corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: "pod" + strconv.Itoa(i), - Namespace: "default", - UID: uid, - Generation: int64(i), - }, - } - podMap[pod.Name] = pod - podSlice = append(podSlice, pod) - } - return podMap, podSlice -} - type MyReq struct { Request *http.Request } diff --git a/staging/src/k8s.io/kubectl/pkg/drain/BUILD b/staging/src/k8s.io/kubectl/pkg/drain/BUILD index 685fabd49f8..6bee4c97b96 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/BUILD +++ b/staging/src/k8s.io/kubectl/pkg/drain/BUILD @@ -1,9 +1,10 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ "cordon.go", + "default.go", "drain.go", "filters.go", ], @@ -11,6 +12,7 @@ go_library( importpath = "k8s.io/kubectl/pkg/drain", visibility = ["//visibility:public"], deps = [ + # Please be wary of additional deps here ... this is intended to be usable as a library "//staging/src/k8s.io/api/apps/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", @@ -21,8 +23,10 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/json:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/strategicpatch:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", ], ) @@ -40,3 +44,17 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["drain_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + ], +) diff --git a/staging/src/k8s.io/kubectl/pkg/drain/default.go b/staging/src/k8s.io/kubectl/pkg/drain/default.go new file mode 100644 index 00000000000..3d118f3b2a4 --- /dev/null +++ b/staging/src/k8s.io/kubectl/pkg/drain/default.go @@ -0,0 +1,67 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" +) + +// This file contains default implementations of how to +// drain/cordon/uncordon nodes. These functions may be called +// directly, or their functionality copied into your own code, for +// example if you want different output behaviour. + +// RunNodeDrain shows the canonical way to drain a node. +// You should first cordon the node, e.g. using RunCordonOrUncordon +func RunNodeDrain(drainer *Helper, nodeName string) error { + list, errs := drainer.GetPodsForDeletion(nodeName) + if errs != nil { + return utilerrors.NewAggregate(errs) + } + if warnings := list.Warnings(); warnings != "" { + fmt.Fprintf(drainer.ErrOut, "WARNING: %s\n", warnings) + } + + if err := drainer.DeleteOrEvictPods(list.Pods()); err != nil { + // Maybe warn about non-deleted pods here + return err + } + return nil +} + +// RunCordonOrUncordon demonstrates the canonical way to cordon or uncordon a Node +func RunCordonOrUncordon(drainer *Helper, node *corev1.Node, desired bool) error { + c := NewCordonHelper(node) + + if updateRequired := c.UpdateIfRequired(desired); !updateRequired { + // Already done + return nil + } + + err, patchErr := c.PatchOrReplace(drainer.Client) + if patchErr != nil { + return patchErr + } + if err != nil { + return err + } + + return nil +} diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain.go b/staging/src/k8s.io/kubectl/pkg/drain/drain.go index 22069ee66d5..2e9c9e8c49a 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/drain.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain.go @@ -17,14 +17,19 @@ limitations under the License. package drain import ( + "fmt" "io" + "math" "time" corev1 "k8s.io/api/core/v1" policyv1beta1 "k8s.io/api/policy/v1beta1" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" ) @@ -39,14 +44,20 @@ const ( type Helper struct { Client kubernetes.Interface Force bool - DryRun bool GracePeriodSeconds int IgnoreAllDaemonSets bool Timeout time.Duration DeleteLocalData bool Selector string PodSelector string + Out io.Writer ErrOut io.Writer + + // TODO(justinsb): unnecessary? + DryRun bool + + // OnPodDeletedOrEvicted is called when a pod is evicted/deleted; for printing progress output + OnPodDeletedOrEvicted func(pod *corev1.Pod, usingEviction bool) } // CheckEvictionSupport uses Discovery API to find out if the server support @@ -157,3 +168,125 @@ func (d *Helper) GetPodsForDeletion(nodeName string) (*podDeleteList, []error) { return list, nil } + +// DeleteOrEvictPods deletes or evicts the pods on the api server +func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error { + if len(pods) == 0 { + return nil + } + + policyGroupVersion, err := CheckEvictionSupport(d.Client) + if err != nil { + return err + } + + // TODO(justinsb): unnecessary? + getPodFn := func(namespace, name string) (*corev1.Pod, error) { + return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) + } + + if len(policyGroupVersion) > 0 { + return d.evictPods(pods, policyGroupVersion, getPodFn) + } + + return d.deletePods(pods, getPodFn) +} + +func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + returnCh := make(chan error, 1) + + for _, pod := range pods { + go func(pod corev1.Pod, returnCh chan error) { + for { + fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name) + err := d.EvictPod(pod, policyGroupVersion) + if err == nil { + break + } else if apierrors.IsNotFound(err) { + returnCh <- nil + return + } else if apierrors.IsTooManyRequests(err) { + fmt.Fprintf(d.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err) + time.Sleep(5 * time.Second) + } else { + returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) + return + } + } + _, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted) + if err == nil { + returnCh <- nil + } else { + returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) + } + }(pod, returnCh) + } + + doneCount := 0 + var errors []error + + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + globalTimeoutCh := time.After(globalTimeout) + numPods := len(pods) + for doneCount < numPods { + select { + case err := <-returnCh: + doneCount++ + if err != nil { + errors = append(errors, err) + } + case <-globalTimeoutCh: + return fmt.Errorf("drain did not complete within %v", globalTimeout) + } + } + return utilerrors.NewAggregate(errors) +} + +func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + for _, pod := range pods { + err := d.DeletePod(pod) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + } + _, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted) + return err +} + +func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) { + err := wait.PollImmediate(interval, timeout, func() (bool, error) { + pendingPods := []corev1.Pod{} + for i, pod := range pods { + p, err := getPodFn(pod.Namespace, pod.Name) + if apierrors.IsNotFound(err) || (p != nil && p.ObjectMeta.UID != pod.ObjectMeta.UID) { + if onDoneFn != nil { + onDoneFn(&pod, usingEviction) + } + continue + } else if err != nil { + return false, err + } else { + pendingPods = append(pendingPods, pods[i]) + } + } + pods = pendingPods + if len(pendingPods) > 0 { + return false, nil + } + return true, nil + }) + return pods, err +} diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go new file mode 100644 index 00000000000..3f5a4467e99 --- /dev/null +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go @@ -0,0 +1,147 @@ +/* +Copyright 2015 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package drain + +import ( + "errors" + "fmt" + "strconv" + "testing" + "time" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" +) + +func TestDeletePods(t *testing.T) { + ifHasBeenCalled := map[string]bool{} + tests := []struct { + description string + interval time.Duration + timeout time.Duration + expectPendingPods bool + expectError bool + expectedError *error + getPodFn func(namespace, name string) (*corev1.Pod, error) + }{ + { + description: "Wait for deleting to complete", + interval: 100 * time.Millisecond, + timeout: 10 * time.Second, + expectPendingPods: false, + expectError: false, + expectedError: nil, + getPodFn: func(namespace, name string) (*corev1.Pod, error) { + oldPodMap, _ := createPods(false) + newPodMap, _ := createPods(true) + if oldPod, found := oldPodMap[name]; found { + if _, ok := ifHasBeenCalled[name]; !ok { + ifHasBeenCalled[name] = true + return &oldPod, nil + } + if oldPod.ObjectMeta.Generation < 4 { + newPod := newPodMap[name] + return &newPod, nil + } + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name) + + } + return nil, apierrors.NewNotFound(schema.GroupResource{Resource: "pods"}, name) + }, + }, + { + description: "Deleting could timeout", + interval: 200 * time.Millisecond, + timeout: 3 * time.Second, + expectPendingPods: true, + expectError: true, + expectedError: &wait.ErrWaitTimeout, + getPodFn: func(namespace, name string) (*corev1.Pod, error) { + oldPodMap, _ := createPods(false) + if oldPod, found := oldPodMap[name]; found { + return &oldPod, nil + } + return nil, fmt.Errorf("%q: not found", name) + }, + }, + { + description: "Client error could be passed out", + interval: 200 * time.Millisecond, + timeout: 5 * time.Second, + expectPendingPods: true, + expectError: true, + expectedError: nil, + getPodFn: func(namespace, name string) (*corev1.Pod, error) { + return nil, errors.New("This is a random error for testing") + }, + }, + } + + for _, test := range tests { + t.Run(test.description, func(t *testing.T) { + _, pods := createPods(false) + pendingPods, err := waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn, nil) + + if test.expectError { + if err == nil { + t.Fatalf("%s: unexpected non-error", test.description) + } else if test.expectedError != nil { + if *test.expectedError != err { + t.Fatalf("%s: the error does not match expected error", test.description) + } + } + } + if !test.expectError && err != nil { + t.Fatalf("%s: unexpected error", test.description) + } + if test.expectPendingPods && len(pendingPods) == 0 { + t.Fatalf("%s: unexpected empty pods", test.description) + } + if !test.expectPendingPods && len(pendingPods) > 0 { + t.Fatalf("%s: unexpected pending pods", test.description) + } + }) + } +} + +func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) { + podMap := make(map[string]corev1.Pod) + podSlice := []corev1.Pod{} + for i := 0; i < 8; i++ { + var uid types.UID + if ifCreateNewPods { + uid = types.UID(i) + } else { + uid = types.UID(strconv.Itoa(i) + strconv.Itoa(i)) + } + pod := corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: "pod" + strconv.Itoa(i), + Namespace: "default", + UID: uid, + Generation: int64(i), + }, + } + podMap[pod.Name] = pod + podSlice = append(podSlice, pod) + } + return podMap, podSlice +} From 4bba4449ae41a7889ad29da4bd1704ae8d19d126 Mon Sep 17 00:00:00 2001 From: Justin SB Date: Mon, 19 Aug 2019 16:22:04 -0400 Subject: [PATCH 2/2] Add tests for newly exposed drain code --- staging/src/k8s.io/kubectl/pkg/drain/BUILD | 4 + .../src/k8s.io/kubectl/pkg/drain/default.go | 2 + .../k8s.io/kubectl/pkg/drain/drain_test.go | 159 ++++++++++++++++++ 3 files changed, 165 insertions(+) diff --git a/staging/src/k8s.io/kubectl/pkg/drain/BUILD b/staging/src/k8s.io/kubectl/pkg/drain/BUILD index 6bee4c97b96..8a8673ed5a2 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/BUILD +++ b/staging/src/k8s.io/kubectl/pkg/drain/BUILD @@ -51,10 +51,14 @@ go_test( embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/api/policy/v1beta1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/types:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library", + "//staging/src/k8s.io/client-go/testing:go_default_library", ], ) diff --git a/staging/src/k8s.io/kubectl/pkg/drain/default.go b/staging/src/k8s.io/kubectl/pkg/drain/default.go index 3d118f3b2a4..ec0351b0fff 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/default.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/default.go @@ -31,6 +31,7 @@ import ( // RunNodeDrain shows the canonical way to drain a node. // You should first cordon the node, e.g. using RunCordonOrUncordon func RunNodeDrain(drainer *Helper, nodeName string) error { + // TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers list, errs := drainer.GetPodsForDeletion(nodeName) if errs != nil { return utilerrors.NewAggregate(errs) @@ -48,6 +49,7 @@ func RunNodeDrain(drainer *Helper, nodeName string) error { // RunCordonOrUncordon demonstrates the canonical way to cordon or uncordon a Node func RunCordonOrUncordon(drainer *Helper, node *corev1.Node, desired bool) error { + // TODO(justinsb): Ensure we have adequate e2e coverage of this function in library consumers c := NewCordonHelper(node) if updateRequired := c.UpdateIfRequired(desired); !updateRequired { diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go index 3f5a4467e99..6bd4b5ba9da 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go @@ -19,16 +19,23 @@ package drain import ( "errors" "fmt" + "os" + "reflect" + "sort" "strconv" "testing" "time" corev1 "k8s.io/api/core/v1" + policyv1beta1 "k8s.io/api/policy/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/fake" + ktest "k8s.io/client-go/testing" ) func TestDeletePods(t *testing.T) { @@ -145,3 +152,155 @@ func createPods(ifCreateNewPods bool) (map[string]corev1.Pod, []corev1.Pod) { } return podMap, podSlice } + +// addEvictionSupport implements simple fake eviction support on the fake.Clientset +func addEvictionSupport(t *testing.T, k *fake.Clientset) { + podsEviction := metav1.APIResource{ + Name: "pods/eviction", + Kind: "Eviction", + Group: "", + Version: "v1", + } + coreResources := &metav1.APIResourceList{ + GroupVersion: "v1", + APIResources: []metav1.APIResource{podsEviction}, + } + + policyResources := &metav1.APIResourceList{ + GroupVersion: "policy/v1", + } + k.Resources = append(k.Resources, coreResources, policyResources) + + // Delete pods when evict is called + k.PrependReactor("create", "pods", func(action ktest.Action) (bool, runtime.Object, error) { + if action.GetSubresource() != "eviction" { + return false, nil, nil + } + + eviction := *action.(ktest.CreateAction).GetObject().(*policyv1beta1.Eviction) + // Avoid the lock + go func() { + err := k.CoreV1().Pods(eviction.Namespace).Delete(eviction.Name, &metav1.DeleteOptions{}) + if err != nil { + // Errorf because we can't call Fatalf from another goroutine + t.Errorf("failed to delete pod: %s/%s", eviction.Namespace, eviction.Name) + } + }() + + return true, nil, nil + }) +} + +func TestCheckEvictionSupport(t *testing.T) { + for _, evictionSupported := range []bool{true, false} { + evictionSupported := evictionSupported + t.Run(fmt.Sprintf("evictionSupported=%v", evictionSupported), + func(t *testing.T) { + k := fake.NewSimpleClientset() + if evictionSupported { + addEvictionSupport(t, k) + } + + apiGroup, err := CheckEvictionSupport(k) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + expectedAPIGroup := "" + if evictionSupported { + expectedAPIGroup = "policy/v1" + } + if apiGroup != expectedAPIGroup { + t.Fatalf("expected apigroup %q, actual=%q", expectedAPIGroup, apiGroup) + } + }) + } +} + +func TestDeleteOrEvict(t *testing.T) { + for _, evictionSupported := range []bool{true, false} { + evictionSupported := evictionSupported + t.Run(fmt.Sprintf("evictionSupported=%v", evictionSupported), + func(t *testing.T) { + h := &Helper{ + Out: os.Stdout, + GracePeriodSeconds: 10, + } + + // Create 4 pods, and try to remove the first 2 + var expectedEvictions []policyv1beta1.Eviction + var create []runtime.Object + deletePods := []corev1.Pod{} + for i := 1; i <= 4; i++ { + pod := &corev1.Pod{} + pod.Name = fmt.Sprintf("mypod-%d", i) + pod.Namespace = "default" + + create = append(create, pod) + if i <= 2 { + deletePods = append(deletePods, *pod) + + if evictionSupported { + eviction := policyv1beta1.Eviction{} + eviction.Kind = "Eviction" + eviction.APIVersion = "policy/v1" + eviction.Namespace = pod.Namespace + eviction.Name = pod.Name + + gracePeriodSeconds := int64(h.GracePeriodSeconds) + eviction.DeleteOptions = &metav1.DeleteOptions{ + GracePeriodSeconds: &gracePeriodSeconds, + } + + expectedEvictions = append(expectedEvictions, eviction) + } + } + } + + // Build the fake client + k := fake.NewSimpleClientset(create...) + if evictionSupported { + addEvictionSupport(t, k) + } + h.Client = k + + // Do the eviction + if err := h.DeleteOrEvictPods(deletePods); err != nil { + t.Fatalf("error from DeleteOrEvictPods: %v", err) + } + + // Test that other pods are still there + var remainingPods []string + { + podList, err := k.CoreV1().Pods("").List(metav1.ListOptions{}) + if err != nil { + t.Fatalf("error listing pods: %v", err) + } + + for _, pod := range podList.Items { + remainingPods = append(remainingPods, pod.Namespace+"/"+pod.Name) + } + sort.Strings(remainingPods) + } + expected := []string{"default/mypod-3", "default/mypod-4"} + if !reflect.DeepEqual(remainingPods, expected) { + t.Errorf("unexpected remaining pods after DeleteOrEvictPods; actual %v; expected %v", remainingPods, expected) + } + + // Test that pods were evicted as expected + var actualEvictions []policyv1beta1.Eviction + for _, action := range k.Actions() { + if action.GetVerb() != "create" || action.GetResource().Resource != "pods" || action.GetSubresource() != "eviction" { + continue + } + eviction := *action.(ktest.CreateAction).GetObject().(*policyv1beta1.Eviction) + actualEvictions = append(actualEvictions, eviction) + } + sort.Slice(actualEvictions, func(i, j int) bool { + return actualEvictions[i].Name < actualEvictions[j].Name + }) + if !reflect.DeepEqual(actualEvictions, expectedEvictions) { + t.Errorf("unexpected evictions; actual %v; expected %v", actualEvictions, expectedEvictions) + } + }) + } +}