From 0e8998a040efe5dd82592911e739fe2e2258cfa3 Mon Sep 17 00:00:00 2001 From: Michael Gugino Date: Tue, 15 Oct 2019 15:24:27 -0400 Subject: [PATCH 1/2] kubectl drain: avoid leaking goroutines Recently, kubectl drain has been refactored to be consumeable as a library. Currently, if a pod cannot be evicted due to PDBs, we will leak a goroutine. This commit ensures the goroutine always exists. Related-bug: https://github.com/kubernetes/kubernetes/issues/81333 --- staging/src/k8s.io/kubectl/pkg/drain/drain.go | 44 ++++++++++++------- .../k8s.io/kubectl/pkg/drain/drain_test.go | 4 +- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain.go b/staging/src/k8s.io/kubectl/pkg/drain/drain.go index 2e9c9e8c49a..3c1ebeef9ef 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/drain.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain.go @@ -17,6 +17,7 @@ limitations under the License. package drain import ( + "context" "fmt" "io" "math" @@ -184,7 +185,6 @@ func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error { getPodFn := func(namespace, name string) (*corev1.Pod, error) { return d.Client.CoreV1().Pods(namespace).Get(name, metav1.GetOptions{}) } - if len(policyGroupVersion) > 0 { return d.evictPods(pods, policyGroupVersion, getPodFn) } @@ -194,11 +194,26 @@ func (d *Helper) DeleteOrEvictPods(pods []corev1.Pod) error { func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodFn func(namespace, name string) (*corev1.Pod, error)) error { returnCh := make(chan error, 1) - + // 0 timeout means infinite, we use MaxInt64 to represent it. + var globalTimeout time.Duration + if d.Timeout == 0 { + globalTimeout = time.Duration(math.MaxInt64) + } else { + globalTimeout = d.Timeout + } + ctx, cancel := context.WithTimeout(context.TODO(), globalTimeout) + defer cancel() for _, pod := range pods { go func(pod corev1.Pod, returnCh chan error) { for { fmt.Fprintf(d.Out, "evicting pod %q\n", pod.Name) + select { + case <-ctx.Done(): + // return here or we'll leak a goroutine. + returnCh <- fmt.Errorf("error when evicting pod %q: global timeout", pod.Name) + return + default: + } err := d.EvictPod(pod, policyGroupVersion) if err == nil { break @@ -213,7 +228,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF return } } - _, err := waitForDelete([]corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted) + _, err := waitForDelete(ctx, []corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted) if err == nil { returnCh <- nil } else { @@ -225,14 +240,6 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF doneCount := 0 var errors []error - // 0 timeout means infinite, we use MaxInt64 to represent it. - var globalTimeout time.Duration - if d.Timeout == 0 { - globalTimeout = time.Duration(math.MaxInt64) - } else { - globalTimeout = d.Timeout - } - globalTimeoutCh := time.After(globalTimeout) numPods := len(pods) for doneCount < numPods { select { @@ -241,10 +248,10 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF if err != nil { errors = append(errors, err) } - case <-globalTimeoutCh: - return fmt.Errorf("drain did not complete within %v", globalTimeout) + default: } } + return utilerrors.NewAggregate(errors) } @@ -262,11 +269,12 @@ func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name str return err } } - _, err := waitForDelete(pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted) + ctx := context.TODO() + _, err := waitForDelete(ctx, pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted) return err } -func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) { +func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) { err := wait.PollImmediate(interval, timeout, func() (bool, error) { pendingPods := []corev1.Pod{} for i, pod := range pods { @@ -284,6 +292,12 @@ func waitForDelete(pods []corev1.Pod, interval, timeout time.Duration, usingEvic } pods = pendingPods if len(pendingPods) > 0 { + select { + case <-ctx.Done(): + return false, fmt.Errorf("global timeout") + default: + return false, nil + } return false, nil } return true, nil diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go index 6bd4b5ba9da..1693079ff13 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go @@ -17,6 +17,7 @@ limitations under the License. package drain import ( + "context" "errors" "fmt" "os" @@ -105,7 +106,8 @@ func TestDeletePods(t *testing.T) { for _, test := range tests { t.Run(test.description, func(t *testing.T) { _, pods := createPods(false) - pendingPods, err := waitForDelete(pods, test.interval, test.timeout, false, test.getPodFn, nil) + ctx := context.TODO() + pendingPods, err := waitForDelete(ctx, pods, test.interval, test.timeout, false, test.getPodFn, nil) if test.expectError { if err == nil { From 0088a9e6a8c47b2710fc49b82a912861cda8a554 Mon Sep 17 00:00:00 2001 From: Michael Gugino Date: Mon, 21 Oct 2019 15:21:26 -0400 Subject: [PATCH 2/2] Add timeout info to kubectl drain logging --- staging/src/k8s.io/kubectl/pkg/drain/drain.go | 10 +++++----- staging/src/k8s.io/kubectl/pkg/drain/drain_test.go | 3 ++- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain.go b/staging/src/k8s.io/kubectl/pkg/drain/drain.go index 3c1ebeef9ef..a941b74935a 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/drain.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain.go @@ -210,7 +210,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF select { case <-ctx.Done(): // return here or we'll leak a goroutine. - returnCh <- fmt.Errorf("error when evicting pod %q: global timeout", pod.Name) + returnCh <- fmt.Errorf("error when evicting pod %q: global timeout reached: %v", pod.Name, globalTimeout) return default: } @@ -228,7 +228,7 @@ func (d *Helper) evictPods(pods []corev1.Pod, policyGroupVersion string, getPodF return } } - _, err := waitForDelete(ctx, []corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted) + _, err := waitForDelete(ctx, []corev1.Pod{pod}, 1*time.Second, time.Duration(math.MaxInt64), true, getPodFn, d.OnPodDeletedOrEvicted, globalTimeout) if err == nil { returnCh <- nil } else { @@ -270,11 +270,11 @@ func (d *Helper) deletePods(pods []corev1.Pod, getPodFn func(namespace, name str } } ctx := context.TODO() - _, err := waitForDelete(ctx, pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted) + _, err := waitForDelete(ctx, pods, 1*time.Second, globalTimeout, false, getPodFn, d.OnPodDeletedOrEvicted, globalTimeout) return err } -func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool)) ([]corev1.Pod, error) { +func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout time.Duration, usingEviction bool, getPodFn func(string, string) (*corev1.Pod, error), onDoneFn func(pod *corev1.Pod, usingEviction bool), globalTimeout time.Duration) ([]corev1.Pod, error) { err := wait.PollImmediate(interval, timeout, func() (bool, error) { pendingPods := []corev1.Pod{} for i, pod := range pods { @@ -294,7 +294,7 @@ func waitForDelete(ctx context.Context, pods []corev1.Pod, interval, timeout tim if len(pendingPods) > 0 { select { case <-ctx.Done(): - return false, fmt.Errorf("global timeout") + return false, fmt.Errorf("global timeout reached: %v", globalTimeout) default: return false, nil } diff --git a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go index 1693079ff13..d6ecca17212 100644 --- a/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go +++ b/staging/src/k8s.io/kubectl/pkg/drain/drain_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "fmt" + "math" "os" "reflect" "sort" @@ -107,7 +108,7 @@ func TestDeletePods(t *testing.T) { t.Run(test.description, func(t *testing.T) { _, pods := createPods(false) ctx := context.TODO() - pendingPods, err := waitForDelete(ctx, pods, test.interval, test.timeout, false, test.getPodFn, nil) + pendingPods, err := waitForDelete(ctx, pods, test.interval, test.timeout, false, test.getPodFn, nil, time.Duration(math.MaxInt64)) if test.expectError { if err == nil {