diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 753d053db40..66226677ea9 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -113,11 +113,6 @@ func NewDeploymentController(dInformer informers.DeploymentInformer, rsInformer UpdateFunc: dc.updateReplicaSet, DeleteFunc: dc.deleteReplicaSet, }) - podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dc.addPod, - UpdateFunc: dc.updatePod, - DeleteFunc: dc.deletePod, - }) dc.syncHandler = dc.syncDeployment dc.dLister = dInformer.Lister() @@ -260,83 +255,6 @@ func (dc *DeploymentController) deleteReplicaSet(obj interface{}) { } } -// getDeploymentForPod returns the deployment that manages the given Pod. -// If there are multiple deployments for a given Pod, only return the oldest one. -func (dc *DeploymentController) getDeploymentForPod(pod *api.Pod) *extensions.Deployment { - deployments, err := dc.dLister.GetDeploymentsForPod(pod) - if err != nil || len(deployments) == 0 { - glog.V(4).Infof("Error: %v. No deployment found for Pod %v, deployment controller will avoid syncing.", err, pod.Name) - return nil - } - - if len(deployments) > 1 { - sort.Sort(util.BySelectorLastUpdateTime(deployments)) - glog.Errorf("user error! more than one deployment is selecting pod %s/%s with labels: %#v, returning %s/%s", pod.Namespace, pod.Name, pod.Labels, deployments[0].Namespace, deployments[0].Name) - } - return deployments[0] -} - -// When a pod is created, ensure its controller syncs -func (dc *DeploymentController) addPod(obj interface{}) { - pod, ok := obj.(*api.Pod) - if !ok { - return - } - glog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod) - if d := dc.getDeploymentForPod(pod); d != nil { - dc.enqueueDeployment(d) - } -} - -// updatePod figures out what deployment(s) manage the ReplicaSet that manages the Pod when the Pod -// is updated and wake them up. If anything of the Pods have changed, we need to awaken both -// the old and new deployments. old and cur must be *api.Pod types. -func (dc *DeploymentController) updatePod(old, cur interface{}) { - curPod := cur.(*api.Pod) - oldPod := old.(*api.Pod) - if curPod.ResourceVersion == oldPod.ResourceVersion { - // Periodic resync will send update events for all known pods. - // Two different versions of the same pod will always have different RVs. - return - } - glog.V(4).Infof("Pod %s updated %#v -> %#v.", curPod.Name, oldPod, curPod) - if d := dc.getDeploymentForPod(curPod); d != nil { - dc.enqueueDeployment(d) - } - if !api.Semantic.DeepEqual(oldPod, curPod) { - if oldD := dc.getDeploymentForPod(oldPod); oldD != nil { - dc.enqueueDeployment(oldD) - } - } -} - -// When a pod is deleted, ensure its controller syncs. -// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. -func (dc *DeploymentController) deletePod(obj interface{}) { - pod, ok := obj.(*api.Pod) - // When a delete is dropped, the relist will notice a pod in the store not - // in the list, leading to the insertion of a tombstone object which contains - // the deleted key/value. Note that this value might be stale. If the pod - // changed labels the new ReplicaSet will not be woken up till the periodic - // resync. - if !ok { - tombstone, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - glog.Errorf("Couldn't get object from tombstone %#v", obj) - return - } - pod, ok = tombstone.Obj.(*api.Pod) - if !ok { - glog.Errorf("Tombstone contained object that is not a pod %#v", obj) - return - } - } - glog.V(4).Infof("Pod %s deleted: %#v.", pod.Name, pod) - if d := dc.getDeploymentForPod(pod); d != nil { - dc.enqueueDeployment(d) - } -} - func (dc *DeploymentController) enqueueDeployment(deployment *extensions.Deployment) { key, err := controller.KeyFunc(deployment) if err != nil { diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index e3ec1d7c6b9..5c9318f14c6 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } if scaledDown { // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) + return dc.syncDeploymentStatus(allRSs, newRS, deployment) } // Wait for all old replica set to scale down to zero. @@ -67,7 +67,7 @@ func (dc *DeploymentController) rolloutRecreate(deployment *extensions.Deploymen } if scaledUp { // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) + return dc.syncDeploymentStatus(allRSs, newRS, deployment) } dc.cleanupDeployment(oldRSs, deployment) diff --git a/pkg/controller/deployment/rolling.go b/pkg/controller/deployment/rolling.go index 0b5e5af8c28..3a3e18f5491 100644 --- a/pkg/controller/deployment/rolling.go +++ b/pkg/controller/deployment/rolling.go @@ -42,7 +42,7 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment } if scaledUp { // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) + return dc.syncDeploymentStatus(allRSs, newRS, deployment) } // Scale down, if we can. @@ -52,7 +52,7 @@ func (dc *DeploymentController) rolloutRolling(deployment *extensions.Deployment } if scaledDown { // Update DeploymentStatus - return dc.updateDeploymentStatus(allRSs, newRS, deployment) + return dc.syncDeploymentStatus(allRSs, newRS, deployment) } dc.cleanupDeployment(oldRSs, deployment) @@ -86,14 +86,8 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep return false, nil } - minReadySeconds := deployment.Spec.MinReadySeconds allPodsCount := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - // TODO: use dc.getAvailablePodsForReplicaSets instead - newRSAvailablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{newRS}, minReadySeconds) - if err != nil { - return false, fmt.Errorf("could not find available pods: %v", err) - } - glog.V(4).Infof("New RS %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRSAvailablePodCount) + glog.V(4).Infof("New replica set %s/%s has %d available pods.", newRS.Namespace, newRS.Name, newRS.Status.AvailableReplicas) maxUnavailable := deploymentutil.MaxUnavailable(*deployment) // Check if we can scale down. We can scale down in the following 2 cases: @@ -127,7 +121,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep // * However, newRSPodsUnavailable would also be 0, so the 2 old replica sets could be scaled down by 5 (13 - 8 - 0), which would then // allow the new replica set to be scaled up by 5. minAvailable := deployment.Spec.Replicas - maxUnavailable - newRSUnavailablePodCount := newRS.Spec.Replicas - newRSAvailablePodCount + newRSUnavailablePodCount := newRS.Spec.Replicas - newRS.Status.AvailableReplicas maxScaledDown := allPodsCount - minAvailable - newRSUnavailablePodCount if maxScaledDown <= 0 { return false, nil @@ -135,7 +129,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep // Clean up unhealthy replicas first, otherwise unhealthy replicas will block deployment // and cause timeout. See https://github.com/kubernetes/kubernetes/issues/16737 - oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, deployment.Spec.MinReadySeconds, maxScaledDown) + oldRSs, cleanupCount, err := dc.cleanupUnhealthyReplicas(oldRSs, deployment, maxScaledDown) if err != nil { return false, nil } @@ -154,7 +148,7 @@ func (dc *DeploymentController) reconcileOldReplicaSets(allRSs []*extensions.Rep } // cleanupUnhealthyReplicas will scale down old replica sets with unhealthy replicas, so that all unhealthy replicas will be deleted. -func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, minReadySeconds, maxCleanupCount int32) ([]*extensions.ReplicaSet, int32, error) { +func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.ReplicaSet, deployment *extensions.Deployment, maxCleanupCount int32) ([]*extensions.ReplicaSet, int32, error) { sort.Sort(controller.ReplicaSetsByCreationTimestamp(oldRSs)) // Safely scale down all old replica sets with unhealthy replicas. Replica set will sort the pods in the order // such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will @@ -168,18 +162,13 @@ func (dc *DeploymentController) cleanupUnhealthyReplicas(oldRSs []*extensions.Re // cannot scale down this replica set. continue } - // TODO: use dc.getAvailablePodsForReplicaSets instead - availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, []*extensions.ReplicaSet{targetRS}, minReadySeconds) - if err != nil { - return nil, totalScaledDown, fmt.Errorf("could not find available pods: %v", err) - } - glog.V(4).Infof("Found %d available pods in old RS %s/%s", availablePodCount, targetRS.Namespace, targetRS.Name) - if targetRS.Spec.Replicas == availablePodCount { + glog.V(4).Infof("Found %d available pods in old RS %s/%s", targetRS.Status.AvailableReplicas, targetRS.Namespace, targetRS.Name) + if targetRS.Spec.Replicas == targetRS.Status.AvailableReplicas { // no unhealthy replicas found, no scaling required. continue } - scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(targetRS.Spec.Replicas-availablePodCount))) + scaledDownCount := int32(integer.IntMin(int(maxCleanupCount-totalScaledDown), int(targetRS.Spec.Replicas-targetRS.Status.AvailableReplicas))) newReplicasCount := targetRS.Spec.Replicas - scaledDownCount if newReplicasCount > targetRS.Spec.Replicas { return nil, 0, fmt.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) @@ -201,13 +190,8 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [ // Check if we can scale down. minAvailable := deployment.Spec.Replicas - maxUnavailable - minReadySeconds := deployment.Spec.MinReadySeconds - // Find the number of ready pods. - // TODO: use dc.getAvailablePodsForReplicaSets instead - availablePodCount, err := deploymentutil.GetAvailablePodsForReplicaSets(dc.client, deployment, allRSs, minReadySeconds) - if err != nil { - return 0, fmt.Errorf("could not find available pods: %v", err) - } + // Find the number of available pods. + availablePodCount := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) if availablePodCount <= minAvailable { // Cannot scale down. return 0, nil @@ -233,7 +217,7 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRollingUpdate(allRSs [ if newReplicasCount > targetRS.Spec.Replicas { return 0, fmt.Errorf("when scaling down old RS, got invalid request to scale down %s/%s %d -> %d", targetRS.Namespace, targetRS.Name, targetRS.Spec.Replicas, newReplicasCount) } - _, _, err = dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) + _, _, err := dc.scaleReplicaSetAndRecordEvent(targetRS, newReplicasCount, deployment) if err != nil { return totalScaledDown, err } diff --git a/pkg/controller/deployment/rolling_test.go b/pkg/controller/deployment/rolling_test.go index 1c9afcd41be..b909859ecdb 100644 --- a/pkg/controller/deployment/rolling_test.go +++ b/pkg/controller/deployment/rolling_test.go @@ -17,15 +17,12 @@ limitations under the License. package deployment import ( - "fmt" "testing" - "k8s.io/kubernetes/pkg/api" exp "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/testing/core" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/intstr" ) @@ -187,84 +184,14 @@ func TestDeploymentController_reconcileOldReplicaSets(t *testing.T) { newSelector := map[string]string{"foo": "new"} oldSelector := map[string]string{"foo": "old"} newRS := rs("foo-new", test.newReplicas, newSelector, noTimestamp) + newRS.Status.AvailableReplicas = int32(test.readyPodsFromNewRS) oldRS := rs("foo-old", test.oldReplicas, oldSelector, noTimestamp) + oldRS.Status.AvailableReplicas = int32(test.readyPodsFromOldRS) oldRSs := []*exp.ReplicaSet{oldRS} allRSs := []*exp.ReplicaSet{oldRS, newRS} maxSurge := intstr.FromInt(0) deployment := newDeployment("foo", test.deploymentReplicas, nil, &maxSurge, &test.maxUnavailable, newSelector) fakeClientset := fake.Clientset{} - fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - switch action.(type) { - case core.ListAction: - podList := &api.PodList{} - for podIndex := 0; podIndex < test.readyPodsFromOldRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-oldReadyPod-%d", oldRS.Name, podIndex), - Labels: oldSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-oldUnhealthyPod-%d", oldRS.Name, podIndex), - Labels: oldSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionFalse, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.readyPodsFromNewRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-newReadyPod-%d", oldRS.Name, podIndex), - Labels: newSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.oldReplicas-test.readyPodsFromOldRS; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-newUnhealthyPod-%d", oldRS.Name, podIndex), - Labels: newSelector, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionFalse, - }, - }, - }, - }) - } - return true, podList, nil - } - return false, nil, nil - }) controller := &DeploymentController{ client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, @@ -327,55 +254,18 @@ func TestDeploymentController_cleanupUnhealthyReplicas(t *testing.T) { for i, test := range tests { t.Logf("executing scenario %d", i) oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) + oldRS.Status.AvailableReplicas = int32(test.readyPods) oldRSs := []*exp.ReplicaSet{oldRS} maxSurge := intstr.FromInt(2) maxUnavailable := intstr.FromInt(2) deployment := newDeployment("foo", 10, nil, &maxSurge, &maxUnavailable, nil) fakeClientset := fake.Clientset{} - fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - switch action.(type) { - case core.ListAction: - podList := &api.PodList{} - for podIndex := 0; podIndex < test.readyPods; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-readyPod-%d", oldRS.Name, podIndex), - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - for podIndex := 0; podIndex < test.unHealthyPods; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-unHealthyPod-%d", oldRS.Name, podIndex), - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionFalse, - }, - }, - }, - }) - } - return true, podList, nil - } - return false, nil, nil - }) controller := &DeploymentController{ client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, } - _, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, 0, int32(test.maxCleanupCount)) + _, cleanupCount, err := controller.cleanupUnhealthyReplicas(oldRSs, deployment, int32(test.maxCleanupCount)) if err != nil { t.Errorf("unexpected error: %v", err) continue @@ -439,35 +329,12 @@ func TestDeploymentController_scaleDownOldReplicaSetsForRollingUpdate(t *testing test := tests[i] t.Logf("executing scenario %d", i) oldRS := rs("foo-v2", test.oldReplicas, nil, noTimestamp) + oldRS.Status.AvailableReplicas = int32(test.readyPods) allRSs := []*exp.ReplicaSet{oldRS} oldRSs := []*exp.ReplicaSet{oldRS} maxSurge := intstr.FromInt(0) deployment := newDeployment("foo", test.deploymentReplicas, nil, &maxSurge, &test.maxUnavailable, map[string]string{"foo": "bar"}) fakeClientset := fake.Clientset{} - fakeClientset.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { - switch action.(type) { - case core.ListAction: - podList := &api.PodList{} - for podIndex := 0; podIndex < test.readyPods; podIndex++ { - podList.Items = append(podList.Items, api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: fmt.Sprintf("%s-pod-%d", oldRS.Name, podIndex), - Labels: map[string]string{"foo": "bar"}, - }, - Status: api.PodStatus{ - Conditions: []api.PodCondition{ - { - Type: api.PodReady, - Status: api.ConditionTrue, - }, - }, - }, - }) - } - return true, podList, nil - } - return false, nil, nil - }) controller := &DeploymentController{ client: &fakeClientset, eventRecorder: &record.FakeRecorder{}, diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 0c7cec64f5b..73a8e51ba98 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -259,10 +259,11 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme rsCopy := objCopy.(*extensions.ReplicaSet) // Set existing new replica set's annotation - if deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true) { - if rsCopy, err = dc.client.Extensions().ReplicaSets(rsCopy.Namespace).Update(rsCopy); err != nil { - return nil, err - } + annotationsUpdated := deploymentutil.SetNewReplicaSetAnnotations(deployment, rsCopy, newRevision, true) + minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != deployment.Spec.MinReadySeconds + if annotationsUpdated || minReadySecondsNeedsUpdate { + rsCopy.Spec.MinReadySeconds = deployment.Spec.MinReadySeconds + return dc.client.Extensions().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(rsCopy) } updateConditions := deploymentutil.SetDeploymentRevision(deployment, newRevision) @@ -294,9 +295,10 @@ func (dc *DeploymentController) getNewReplicaSet(deployment *extensions.Deployme Namespace: namespace, }, Spec: extensions.ReplicaSetSpec{ - Replicas: 0, - Selector: newRSSelector, - Template: newRSTemplate, + Replicas: 0, + MinReadySeconds: deployment.Spec.MinReadySeconds, + Selector: newRSSelector, + Template: newRSTemplate, }, } allRSs := append(oldRSs, &newRS) @@ -478,21 +480,20 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, d *extensions.Deployment) error { - newStatus, err := dc.calculateStatus(allRSs, newRS, d) - if err != nil { - return err + newStatus := dc.calculateStatus(allRSs, newRS, d) + + if reflect.DeepEqual(d.Status, newStatus) { + return nil } - if !reflect.DeepEqual(d.Status, newStatus) { - return dc.updateDeploymentStatus(allRSs, newRS, d) - } - return nil + + newDeployment := d + newDeployment.Status = newStatus + _, err := dc.client.Extensions().Deployments(newDeployment.Namespace).UpdateStatus(newDeployment) + return err } -func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (extensions.DeploymentStatus, error) { - availableReplicas, err := dc.getAvailablePodsForReplicaSets(deployment, allRSs) - if err != nil { - return deployment.Status, fmt.Errorf("failed to count available pods: %v", err) - } +func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) extensions.DeploymentStatus { + availableReplicas := deploymentutil.GetAvailableReplicaCountForReplicaSets(allRSs) totalReplicas := deploymentutil.GetReplicaCountForReplicaSets(allRSs) return extensions.DeploymentStatus{ @@ -502,26 +503,7 @@ func (dc *DeploymentController) calculateStatus(allRSs []*extensions.ReplicaSet, UpdatedReplicas: deploymentutil.GetActualReplicaCountForReplicaSets([]*extensions.ReplicaSet{newRS}), AvailableReplicas: availableReplicas, UnavailableReplicas: totalReplicas - availableReplicas, - }, nil -} - -func (dc *DeploymentController) getAvailablePodsForReplicaSets(deployment *extensions.Deployment, rss []*extensions.ReplicaSet) (int32, error) { - podList, err := dc.listPods(deployment) - if err != nil { - return 0, err } - return deploymentutil.CountAvailablePodsForReplicaSets(podList, rss, deployment.Spec.MinReadySeconds) -} - -func (dc *DeploymentController) updateDeploymentStatus(allRSs []*extensions.ReplicaSet, newRS *extensions.ReplicaSet, deployment *extensions.Deployment) error { - newStatus, err := dc.calculateStatus(allRSs, newRS, deployment) - if err != nil { - return err - } - newDeployment := deployment - newDeployment.Status = newStatus - _, err = dc.client.Extensions().Deployments(deployment.Namespace).UpdateStatus(newDeployment) - return err } // isScalingEvent checks whether the provided deployment has been updated with a scaling event diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 423f974fd17..3502e77d8f7 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -39,7 +39,6 @@ import ( intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" - rsutil "k8s.io/kubernetes/pkg/util/replicaset" "k8s.io/kubernetes/pkg/util/wait" ) @@ -636,65 +635,35 @@ func SetFromReplicaSetTemplate(deployment *extensions.Deployment, template api.P // GetReplicaCountForReplicaSets returns the sum of Replicas of the given replica sets. func GetReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 { - totalReplicaCount := int32(0) + totalReplicas := int32(0) for _, rs := range replicaSets { if rs != nil { - totalReplicaCount += rs.Spec.Replicas + totalReplicas += rs.Spec.Replicas } } - return totalReplicaCount + return totalReplicas } // GetActualReplicaCountForReplicaSets returns the sum of actual replicas of the given replica sets. func GetActualReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 { - totalReplicaCount := int32(0) + totalActualReplicas := int32(0) for _, rs := range replicaSets { if rs != nil { - totalReplicaCount += rs.Status.Replicas + totalActualReplicas += rs.Status.Replicas } } - return totalReplicaCount + return totalActualReplicas } -// GetAvailablePodsForReplicaSets returns the number of available pods (listed from clientset) corresponding to the given replica sets. -func GetAvailablePodsForReplicaSets(c clientset.Interface, deployment *extensions.Deployment, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { - podList, err := listPods(deployment, c) - if err != nil { - return 0, err - } - return CountAvailablePodsForReplicaSets(podList, rss, minReadySeconds) -} - -// CountAvailablePodsForReplicaSets returns the number of available pods corresponding to the given pod list and replica sets. -// Note that the input pod list should be the pods targeted by the deployment of input replica sets. -func CountAvailablePodsForReplicaSets(podList *api.PodList, rss []*extensions.ReplicaSet, minReadySeconds int32) (int32, error) { - rsPods, err := filterPodsMatchingReplicaSets(rss, podList) - if err != nil { - return 0, err - } - return countAvailablePods(rsPods, minReadySeconds), nil -} - -// GetAvailablePodsForDeployment returns the number of available pods (listed from clientset) corresponding to the given deployment. -func GetAvailablePodsForDeployment(c clientset.Interface, deployment *extensions.Deployment) (int32, error) { - podList, err := listPods(deployment, c) - if err != nil { - return 0, err - } - return countAvailablePods(podList.Items, deployment.Spec.MinReadySeconds), nil -} - -func countAvailablePods(pods []api.Pod, minReadySeconds int32) int32 { - availablePodCount := int32(0) - for _, pod := range pods { - // TODO: Make the time.Now() as argument to allow unit test this. - // FIXME: avoid using time.Now - if IsPodAvailable(&pod, minReadySeconds, time.Now()) { - glog.V(4).Infof("Pod %s/%s is available.", pod.Namespace, pod.Name) - availablePodCount++ +// GetAvailableReplicaCountForReplicaSets returns the number of available pods corresponding to the given replica sets. +func GetAvailableReplicaCountForReplicaSets(replicaSets []*extensions.ReplicaSet) int32 { + totalAvailableReplicas := int32(0) + for _, rs := range replicaSets { + if rs != nil { + totalAvailableReplicas += rs.Status.AvailableReplicas } } - return availablePodCount + return totalAvailableReplicas } // IsPodAvailable return true if the pod is available. @@ -722,22 +691,6 @@ func IsPodAvailable(pod *api.Pod, minReadySeconds int32, now time.Time) bool { return false } -// filterPodsMatchingReplicaSets filters the given pod list and only return the ones targeted by the input replicasets -func filterPodsMatchingReplicaSets(replicaSets []*extensions.ReplicaSet, podList *api.PodList) ([]api.Pod, error) { - rsPods := []api.Pod{} - for _, rs := range replicaSets { - matchingFunc, err := rsutil.MatchingPodsFunc(rs) - if err != nil { - return nil, err - } - if matchingFunc == nil { - continue - } - rsPods = append(rsPods, podutil.Filter(podList, matchingFunc)...) - } - return rsPods, nil -} - // IsRollingUpdate returns true if the strategy type is a rolling update. func IsRollingUpdate(deployment *extensions.Deployment) bool { return deployment.Spec.Strategy.Type == extensions.RollingUpdateDeploymentStrategyType diff --git a/pkg/controller/deployment/util/deployment_util_test.go b/pkg/controller/deployment/util/deployment_util_test.go index 4eab29807ab..4ee0a86624f 100644 --- a/pkg/controller/deployment/util/deployment_util_test.go +++ b/pkg/controller/deployment/util/deployment_util_test.go @@ -99,40 +99,6 @@ func newPod(now time.Time, ready bool, beforeSec int) api.Pod { } } -func TestCountAvailablePods(t *testing.T) { - now := time.Now() - tests := []struct { - pods []api.Pod - minReadySeconds int - expected int - }{ - { - []api.Pod{ - newPod(now, true, 0), - newPod(now, true, 2), - newPod(now, false, 1), - }, - 1, - 1, - }, - { - []api.Pod{ - newPod(now, true, 2), - newPod(now, true, 11), - newPod(now, true, 5), - }, - 10, - 1, - }, - } - - for _, test := range tests { - if count := countAvailablePods(test.pods, int32(test.minReadySeconds)); int(count) != test.expected { - t.Errorf("Pods = %#v, minReadySeconds = %d, expected %d, got %d", test.pods, test.minReadySeconds, test.expected, count) - } - } -} - // generatePodFromRS creates a pod, with the input ReplicaSet's selector and its template func generatePodFromRS(rs extensions.ReplicaSet) api.Pod { return api.Pod{ diff --git a/test/e2e/framework/util.go b/test/e2e/framework/util.go index b858b70a3fa..77a865246b3 100644 --- a/test/e2e/framework/util.go +++ b/test/e2e/framework/util.go @@ -3507,10 +3507,6 @@ func WaitForDeploymentStatusValid(c clientset.Interface, d *extensions.Deploymen } } totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - totalAvailable, err := deploymentutil.GetAvailablePodsForDeployment(c, deployment) - if err != nil { - return false, err - } maxCreated := deployment.Spec.Replicas + deploymentutil.MaxSurge(*deployment) if totalCreated > maxCreated { reason = fmt.Sprintf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated) @@ -3518,8 +3514,8 @@ func WaitForDeploymentStatusValid(c clientset.Interface, d *extensions.Deploymen return false, nil } minAvailable := deploymentutil.MinAvailable(deployment) - if totalAvailable < minAvailable { - reason = fmt.Sprintf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable) + if deployment.Status.AvailableReplicas < minAvailable { + reason = fmt.Sprintf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable) Logf(reason) return false, nil } @@ -3568,10 +3564,6 @@ func WaitForDeploymentStatus(c clientset.Interface, d *extensions.Deployment) er } } totalCreated := deploymentutil.GetReplicaCountForReplicaSets(allRSs) - totalAvailable, err := deploymentutil.GetAvailablePodsForDeployment(c, deployment) - if err != nil { - return false, err - } maxCreated := deployment.Spec.Replicas + deploymentutil.MaxSurge(*deployment) if totalCreated > maxCreated { logReplicaSetsOfDeployment(deployment, allOldRSs, newRS) @@ -3579,10 +3571,10 @@ func WaitForDeploymentStatus(c clientset.Interface, d *extensions.Deployment) er return false, fmt.Errorf("total pods created: %d, more than the max allowed: %d", totalCreated, maxCreated) } minAvailable := deploymentutil.MinAvailable(deployment) - if totalAvailable < minAvailable { + if deployment.Status.AvailableReplicas < minAvailable { logReplicaSetsOfDeployment(deployment, allOldRSs, newRS) logPodsOfDeployment(c, deployment) - return false, fmt.Errorf("total pods available: %d, less than the min required: %d", totalAvailable, minAvailable) + return false, fmt.Errorf("total pods available: %d, less than the min required: %d", deployment.Status.AvailableReplicas, minAvailable) } // When the deployment status and its underlying resources reach the desired state, we're done