kubectl: wait for all errors and successes on podEviction

This commit is contained in:
Ryan Phillips 2018-06-25 10:09:12 -05:00
parent 7786bd8c9a
commit 5b4770e083

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/strategicpatch"
@ -572,38 +573,39 @@ func (o *DrainOptions) deleteOrEvictPods(pods []corev1.Pod) error {
} }
func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {
doneCh := make(chan bool, len(pods)) returnCh := make(chan error, 1)
errCh := make(chan error, 1)
for _, pod := range pods { for _, pod := range pods {
go func(pod corev1.Pod, doneCh chan bool, errCh chan error) { go func(pod corev1.Pod, returnCh chan error) {
var err error var err error
for { for {
err = o.evictPod(pod, policyGroupVersion) err = o.evictPod(pod, policyGroupVersion)
if err == nil { if err == nil {
break break
} else if apierrors.IsNotFound(err) { } else if apierrors.IsNotFound(err) {
doneCh <- true returnCh <- nil
return return
} else if apierrors.IsTooManyRequests(err) { } else if apierrors.IsTooManyRequests(err) {
fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err) fmt.Fprintf(o.ErrOut, "error when evicting pod %q (will retry after 5s): %v\n", pod.Name, err)
time.Sleep(5 * time.Second) time.Sleep(5 * time.Second)
} else { } else {
errCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err) returnCh <- fmt.Errorf("error when evicting pod %q: %v", pod.Name, err)
return return
} }
} }
podArray := []corev1.Pod{pod} podArray := []corev1.Pod{pod}
_, err = o.waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn) _, err = o.waitForDelete(podArray, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn)
if err == nil { if err == nil {
doneCh <- true returnCh <- nil
} else { } else {
errCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err) returnCh <- fmt.Errorf("error when waiting for pod %q terminating: %v", pod.Name, err)
} }
}(pod, doneCh, errCh) }(pod, returnCh)
} }
doneCount := 0 doneCount := 0
var errors []error
// 0 timeout means infinite, we use MaxInt64 to represent it. // 0 timeout means infinite, we use MaxInt64 to represent it.
var globalTimeout time.Duration var globalTimeout time.Duration
if o.Timeout == 0 { if o.Timeout == 0 {
@ -612,19 +614,19 @@ func (o *DrainOptions) evictPods(pods []corev1.Pod, policyGroupVersion string, g
globalTimeout = o.Timeout globalTimeout = o.Timeout
} }
globalTimeoutCh := time.After(globalTimeout) globalTimeoutCh := time.After(globalTimeout)
for { numPods := len(pods)
for doneCount < numPods {
select { select {
case err := <-errCh: case err := <-returnCh:
return err
case <-doneCh:
doneCount++ doneCount++
if doneCount == len(pods) { if err != nil {
return nil errors = append(errors, err)
} }
case <-globalTimeoutCh: case <-globalTimeoutCh:
return fmt.Errorf("Drain did not complete within %v", globalTimeout) return fmt.Errorf("Drain did not complete within %v", globalTimeout)
} }
} }
return utilerrors.NewAggregate(errors)
} }
func (o *DrainOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { func (o *DrainOptions) deletePods(pods []corev1.Pod, getPodFn func(namespace, name string) (*corev1.Pod, error)) error {