Merge pull request #9515 from markturansky/scrub_thread_leak

Fixes thread leak from discarded watch
This commit is contained in:
Abhi Shah 2015-06-12 16:52:21 -07:00
commit 3e8690c28b
2 changed files with 11 additions and 5 deletions

View File

@ -82,7 +82,10 @@ func internalScrubPodVolumeAndWatchUntilCompletion(pod *api.Pod, scrubberClient
defer scrubberClient.DeletePod(pod.Name, pod.Namespace) defer scrubberClient.DeletePod(pod.Name, pod.Namespace)
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion) stopChannel := make(chan struct{})
defer close(stopChannel)
nextPod := scrubberClient.WatchPod(pod.Name, pod.Namespace, pod.ResourceVersion, stopChannel)
for { for {
watchedPod := nextPod() watchedPod := nextPod()
if watchedPod.Status.Phase == api.PodSucceeded { if watchedPod.Status.Phase == api.PodSucceeded {
@ -106,7 +109,7 @@ type scrubberClient interface {
CreatePod(pod *api.Pod) (*api.Pod, error) CreatePod(pod *api.Pod) (*api.Pod, error)
GetPod(name, namespace string) (*api.Pod, error) GetPod(name, namespace string) (*api.Pod, error)
DeletePod(name, namespace string) error DeletePod(name, namespace string) error
WatchPod(name, namespace, resourceVersion string) func() *api.Pod WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod
} }
func newScrubberClient(client client.Interface) scrubberClient { func newScrubberClient(client client.Interface) scrubberClient {
@ -129,7 +132,10 @@ func (c *realScrubberClient) DeletePod(name, namespace string) error {
return c.client.Pods(namespace).Delete(name, nil) return c.client.Pods(namespace).Delete(name, nil)
} }
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod { // WatchPod returns a ListWatch for watching a pod. The stopChannel is used
// to close the reflector backing the watch. The caller is responsible for derring a close on the channel to
// stop the reflector.
func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
fieldSelector, _ := fields.ParseSelector("metadata.name=" + name) fieldSelector, _ := fields.ParseSelector("metadata.name=" + name)
podLW := &cache.ListWatch{ podLW := &cache.ListWatch{
@ -141,7 +147,7 @@ func (c *realScrubberClient) WatchPod(name, namespace, resourceVersion string) f
}, },
} }
queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc) queue := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).Run() cache.NewReflector(podLW, &api.Pod{}, queue, 1*time.Minute).RunUntil(stopChannel)
return func() *api.Pod { return func() *api.Pod {
obj := queue.Pop() obj := queue.Pop()

View File

@ -95,7 +95,7 @@ func (c *mockScrubberClient) DeletePod(name, namespace string) error {
return nil return nil
} }
func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string) func() *api.Pod { func (c *mockScrubberClient) WatchPod(name, namespace, resourceVersion string, stopChannel chan struct{}) func() *api.Pod {
return func() *api.Pod { return func() *api.Pod {
return c.pod return c.pod
} }