Update DeploymentReaper.Stop to use ObservedGeneration to remove race condition

This commit is contained in:
nikhiljindal 2016-02-23 20:27:24 -08:00
parent 9c1d8bf99d
commit 7e50fa6df0
5 changed files with 39 additions and 57 deletions

View File

@ -30,6 +30,7 @@ import (
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
deploymentutil "k8s.io/kubernetes/pkg/util/deployment"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/wait"
)
@ -368,52 +369,30 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
zero := 0
d.Spec.RevisionHistoryLimit = &zero
d.Spec.Replicas = 0
// TODO: un-pausing should not be necessary, remove when this is fixed:
// https://github.com/kubernetes/kubernetes/issues/20966
// Instead deployment should be Paused at this point and not at next TODO.
d.Spec.Paused = false
})
if err != nil {
return err
}
// wait for total no of pods drop to 0
if err := wait.Poll(reaper.pollInterval, reaper.timeout, func() (bool, error) {
curr, err := deployments.Get(name)
// if deployment was not found it must have been deleted, error out
if err != nil && errors.IsNotFound(err) {
return false, err
}
// if other errors happen, retry
if err != nil {
return false, nil
}
// check if deployment wasn't recreated with the same name
// TODO use generations when deployment will have them
if curr.UID != deployment.UID {
return false, errors.NewNotFound(extensions.Resource("Deployment"), name)
}
return curr.Status.Replicas == 0, nil
}); err != nil {
return err
}
// TODO: When deployments will allow running cleanup policy while being
// paused, move pausing to above update operation. Without it, we need to
// pause deployment before stopping RSs, to prevent creating new RSs.
// See https://github.com/kubernetes/kubernetes/issues/20966
deployment, err = reaper.updateDeploymentWithRetries(namespace, name, func(d *extensions.Deployment) {
d.Spec.Paused = true
})
if err != nil {
return err
}
// remove remaining RSs
// Use observedGeneration to determine if the deployment controller noticed the pause.
if err := deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) {
return deployments.Get(name)
}, deployment.Generation, 10*time.Millisecond, 1*time.Minute); err != nil {
return err
}
// Delete deployment.
if err := deployments.Delete(name, gracePeriod); err != nil {
return err
}
// Stop all replica sets.
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
if err != nil {
return err
}
options := api.ListOptions{LabelSelector: selector}
rsList, err := replicaSets.List(options)
if err != nil {
@ -430,9 +409,7 @@ func (reaper *DeploymentReaper) Stop(namespace, name string, timeout time.Durati
if len(errList) > 0 {
return utilerrors.NewAggregate(errList)
}
// and finally deployment
return deployments.Delete(name, gracePeriod)
return nil
}
type updateDeploymentFunc func(d *extensions.Deployment)

View File

@ -538,8 +538,7 @@ func TestDeploymentStop(t *testing.T) {
},
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "get:deployments", "update:deployments",
"list:replicasets", "delete:deployments"},
"get:deployments", "delete:deployments", "list:replicasets"},
},
{
Name: "Deployment with single replicaset",
@ -561,10 +560,9 @@ func TestDeploymentStop(t *testing.T) {
},
StopError: nil,
ExpectedActions: []string{"get:deployments", "update:deployments",
"get:deployments", "get:deployments", "update:deployments",
"list:replicasets", "get:replicasets", "get:replicasets",
"update:replicasets", "get:replicasets", "get:replicasets",
"delete:replicasets", "delete:deployments"},
"get:deployments", "delete:deployments", "list:replicasets",
"get:replicasets", "get:replicasets", "update:replicasets",
"get:replicasets", "get:replicasets", "delete:replicasets"},
},
}

View File

@ -427,3 +427,16 @@ func NewRSNewReplicas(deployment *extensions.Deployment, allRSs []*extensions.Re
return 0, fmt.Errorf("deployment type %v isn't supported", deployment.Spec.Strategy.Type)
}
}
// Polls for deployment to be updated so that deployment.Status.ObservedGeneration >= desiredGeneration.
// Returns error if polling timesout.
func WaitForObservedDeployment(getDeploymentFunc func() (*extensions.Deployment, error), desiredGeneration int64, interval, timeout time.Duration) error {
// TODO: This should take clientset.Interface when all code is updated to use clientset. Keeping it this way allows the function to be used by callers who have client.Interface.
return wait.Poll(interval, timeout, func() (bool, error) {
deployment, err := getDeploymentFunc()
if err != nil {
return false, err
}
return deployment.Status.ObservedGeneration >= desiredGeneration, nil
})
}

View File

@ -543,7 +543,7 @@ func testPausedDeployment(f *Framework) {
Expect(err).NotTo(HaveOccurred())
// Use observedGeneration to determine if the controller noticed the resume.
err = waitForObservedDeployment(c, ns, deploymentName)
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
@ -570,7 +570,7 @@ func testPausedDeployment(f *Framework) {
Expect(err).NotTo(HaveOccurred())
// Use observedGeneration to determine if the controller noticed the pause.
err = waitForObservedDeployment(c, ns, deploymentName)
err = waitForObservedDeployment(c, ns, deploymentName, deployment.Generation)
Expect(err).NotTo(HaveOccurred())
newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c)

View File

@ -2224,16 +2224,6 @@ func waitForDeploymentOldRSsNum(c *clientset.Clientset, ns, deploymentName strin
})
}
func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string) error {
return wait.Poll(poll, 1*time.Minute, func() (bool, error) {
deployment, err := c.Extensions().Deployments(ns).Get(deploymentName)
if err != nil {
return false, err
}
return deployment.Generation == deployment.Status.ObservedGeneration, nil
})
}
func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet) {
Logf("Deployment = %+v", deployment)
for i := range allOldRSs {
@ -2242,6 +2232,10 @@ func logReplicaSetsOfDeployment(deployment *extensions.Deployment, allOldRSs []*
Logf("New ReplicaSet of deployment %s: %+v", deployment.Name, newRS)
}
func waitForObservedDeployment(c *clientset.Clientset, ns, deploymentName string, desiredGeneration int64) error {
return deploymentutil.WaitForObservedDeployment(func() (*extensions.Deployment, error) { return c.Extensions().Deployments(ns).Get(deploymentName) }, desiredGeneration, poll, 1*time.Minute)
}
func logPodsOfReplicaSets(c clientset.Interface, rss []*extensions.ReplicaSet, minReadySeconds int) {
allPods, err := deploymentutil.GetPodsForReplicaSets(c, rss)
if err == nil {