Merge pull request #30670 from caesarxuchao/fix-waitForPodsGone

[GarbageCollector] Fix waitForRCPodsGone
This commit is contained in:
Marek Grabowski 2016-08-17 11:34:09 +02:00 committed by GitHub
commit b17379007a

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl"
@ -231,7 +232,8 @@ func GetMasterHost() string {
// Convenient wrapper around cache.Store that returns list of api.Pod instead of interface{}.
type PodStore struct {
cache.Store
stopCh chan struct{}
stopCh chan struct{}
reflector *cache.Reflector
}
func NewPodStore(c *client.Client, namespace string, label labels.Selector, field fields.Selector) *PodStore {
@ -249,8 +251,9 @@ func NewPodStore(c *client.Client, namespace string, label labels.Selector, fiel
}
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
stopCh := make(chan struct{})
cache.NewReflector(lw, &api.Pod{}, store, 0).RunUntil(stopCh)
return &PodStore{store, stopCh}
reflector := cache.NewReflector(lw, &api.Pod{}, store, 0)
reflector.RunUntil(stopCh)
return &PodStore{store, stopCh, reflector}
}
func (s *PodStore) List() []*api.Pod {
@ -3068,6 +3071,11 @@ func DeleteRCAndPods(c *client.Client, ns, name string) error {
}
return err
}
ps, err := podStoreForRC(c, rc)
if err != nil {
return err
}
defer ps.Stop()
startTime := time.Now()
err = reaper.Stop(ns, name, 0, api.NewDeleteOptions(0))
if apierrs.IsNotFound(err) {
@ -3079,12 +3087,18 @@ func DeleteRCAndPods(c *client.Client, ns, name string) error {
if err != nil {
return fmt.Errorf("error while stopping RC: %s: %v", name, err)
}
err = waitForRCPodsGone(c, rc, nil)
err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while deleting RC %s: %v", name, err)
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
// this is to relieve namespace controller's pressure when deleting the
// namespace after a test.
err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
return nil
}
@ -3099,6 +3113,11 @@ func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error {
}
return err
}
ps, err := podStoreForRC(c, rc)
if err != nil {
return err
}
defer ps.Stop()
startTime := time.Now()
falseVar := false
deleteOption := &api.DeleteOptions{OrphanDependents: &falseVar}
@ -3112,29 +3131,53 @@ func DeleteRCAndWaitForGC(c *client.Client, ns, name string) error {
}
deleteRCTime := time.Now().Sub(startTime)
Logf("Deleting RC %s took: %v", name, deleteRCTime)
timeout := 10 * time.Minute
err = waitForRCPodsGone(c, rc, &timeout)
err = waitForPodsInactive(ps, 10*time.Millisecond, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while deleting RC %s: %v", name, err)
return fmt.Errorf("error while waiting for pods to become inactive %s: %v", name, err)
}
terminatePodTime := time.Now().Sub(startTime) - deleteRCTime
Logf("Terminating RC %s pods took: %v", name, terminatePodTime)
err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute)
if err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
return nil
}
// waitForRCPodsGone waits until there are no pods reported under an RC's selector (because the pods
// have completed termination).
func waitForRCPodsGone(c *client.Client, rc *api.ReplicationController, timeout *time.Duration) error {
if timeout == nil {
defaultTimeout := 2 * time.Minute
timeout = &defaultTimeout
}
// podStoreForRC creates a PodStore that monitors pods belong to the rc. It
// waits until the reflector does a List() before returning.
func podStoreForRC(c *client.Client, rc *api.ReplicationController) (*PodStore, error) {
labels := labels.SelectorFromSet(rc.Spec.Selector)
PodStore := NewPodStore(c, rc.Namespace, labels, fields.Everything())
defer PodStore.Stop()
ps := NewPodStore(c, rc.Namespace, labels, fields.Everything())
err := wait.Poll(1*time.Second, 1*time.Minute, func() (bool, error) {
if len(ps.reflector.LastSyncResourceVersion()) != 0 {
return true, nil
}
return false, nil
})
return ps, err
}
return wait.PollImmediate(Poll, *timeout, func() (bool, error) {
if pods := PodStore.List(); len(pods) == 0 {
// waitForPodsInactive waits until there are no active pods left in the PodStore.
// This is to make a fair comparison of deletion time between DeleteRCAndPods
// and DeleteRCAndWaitForGC, because the RC controller decreases status.replicas
// when the pod is inactvie.
func waitForPodsInactive(ps *PodStore, interval, timeout time.Duration) error {
return wait.PollImmediate(interval, timeout, func() (bool, error) {
pods := ps.List()
for _, pod := range pods {
if controller.IsPodActive(*pod) {
return false, nil
}
}
return true, nil
})
}
// waitForPodsGone waits until there are no pods left in the PodStore.
func waitForPodsGone(ps *PodStore, interval, timeout time.Duration) error {
return wait.PollImmediate(interval, timeout, func() (bool, error) {
if pods := ps.List(); len(pods) == 0 {
return true, nil
}
return false, nil
@ -4284,9 +4327,14 @@ func ScaleRCByLabels(client *client.Client, ns string, l map[string]string, repl
return err
}
if replicas == 0 {
if err := waitForRCPodsGone(client, rc, nil); err != nil {
ps, err := podStoreForRC(client, rc)
if err != nil {
return err
}
defer ps.Stop()
if err = waitForPodsGone(ps, 10*time.Second, 10*time.Minute); err != nil {
return fmt.Errorf("error while waiting for pods gone %s: %v", name, err)
}
} else {
if err := WaitForPodsWithLabelRunning(
client, ns, labels.SelectorFromSet(labels.Set(rc.Spec.Selector))); err != nil {