diff --git a/pkg/controller/deployment/BUILD b/pkg/controller/deployment/BUILD index f6b6ec4cfb0..a38aa4f3ec5 100644 --- a/pkg/controller/deployment/BUILD +++ b/pkg/controller/deployment/BUILD @@ -71,6 +71,7 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", + "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/client-go/testing", diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index 54b85dbb4bc..fc6cd8577b3 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -26,6 +26,7 @@ import ( "time" "github.com/golang/glog" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -537,10 +538,8 @@ func (dc *DeploymentController) getPodMapForDeployment(d *extensions.Deployment, podMap[rs.UID] = &v1.PodList{} } for _, pod := range pods { - // Ignore inactive Pods since that's what ReplicaSet does. - if !controller.IsPodActive(pod) { - continue - } + // Do not ignore inactive Pods because Recreate Deployments need to verify that no + // Pods from older versions are running before spinning up new Pods. controllerRef := controller.GetControllerOf(pod) if controllerRef == nil { continue @@ -614,6 +613,10 @@ func (dc *DeploymentController) syncDeployment(key string) error { return err } // List all Pods owned by this Deployment, grouped by their ReplicaSet. + // Current uses of the podMap are: + // + // * check if a Pod is labeled correctly with the pod-template-hash label. + // * check that no old Pods are running in the middle of Recreate Deployments. podMap, err := dc.getPodMapForDeployment(d, rsList) if err != nil { return err diff --git a/pkg/controller/deployment/deployment_controller_test.go b/pkg/controller/deployment/deployment_controller_test.go index 686030ceabd..2cfe20da267 100644 --- a/pkg/controller/deployment/deployment_controller_test.go +++ b/pkg/controller/deployment/deployment_controller_test.go @@ -560,18 +560,21 @@ func TestGetPodMapForReplicaSets(t *testing.T) { for _, podList := range podMap { podCount += len(podList.Items) } - if got, want := podCount, 2; got != want { + if got, want := podCount, 3; got != want { t.Errorf("podCount = %v, want %v", got, want) } if got, want := len(podMap), 2; got != want { t.Errorf("len(podMap) = %v, want %v", got, want) } - if got, want := len(podMap[rs1.UID].Items), 1; got != want { + if got, want := len(podMap[rs1.UID].Items), 2; got != want { t.Errorf("len(podMap[rs1]) = %v, want %v", got, want) } - if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want { - t.Errorf("podMap[rs1] = [%v], want [%v]", got, want) + expect := map[string]struct{}{"rs1-pod": {}, "pod4": {}} + for _, pod := range podMap[rs1.UID].Items { + if _, ok := expect[pod.Name]; !ok { + t.Errorf("unexpected pod name for rs1: %s", pod.Name) + } } if got, want := len(podMap[rs2.UID].Items), 1; got != want { t.Errorf("len(podMap[rs2]) = %v, want %v", got, want) diff --git a/pkg/controller/deployment/recreate.go b/pkg/controller/deployment/recreate.go index 916649de9ee..b47a391bf28 100644 --- a/pkg/controller/deployment/recreate.go +++ b/pkg/controller/deployment/recreate.go @@ -21,6 +21,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/deployment/util" ) // rolloutRecreate implements the logic for recreating a replica set. @@ -43,18 +44,12 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList return dc.syncRolloutStatus(allRSs, newRS, d) } - newStatus := calculateStatus(allRSs, newRS, d) // Do not process a deployment when it has old pods running. - if newStatus.UpdatedReplicas == 0 { - for _, podList := range podMap { - if len(podList.Items) > 0 { - return dc.syncRolloutStatus(allRSs, newRS, d) - } - } + if oldPodsRunning(newRS, oldRSs, podMap) { + return dc.syncRolloutStatus(allRSs, newRS, d) } // If we need to create a new RS, create it now. - // TODO: Create a new RS without re-listing all RSs. if newRS == nil { newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) if err != nil { @@ -64,14 +59,9 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList } // scale up new replica set. - scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d) - if err != nil { + if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil { return err } - if scaledUp { - // Update DeploymentStatus. - return dc.syncRolloutStatus(allRSs, newRS, d) - } // Sync deployment status. return dc.syncRolloutStatus(allRSs, newRS, d) @@ -98,6 +88,23 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext return scaled, nil } +// oldPodsRunning returns whether there are old pods running or any of the old ReplicaSets thinks that it runs pods. +func oldPodsRunning(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) bool { + if oldPods := util.GetActualReplicaCountForReplicaSets(oldRSs); oldPods > 0 { + return true + } + for rsUID, podList := range podMap { + // If the pods belong to the new ReplicaSet, ignore. + if newRS != nil && newRS.UID == rsUID { + continue + } + if len(podList.Items) > 0 { + return true + } + } + return false +} + // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate". func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment) diff --git a/pkg/controller/deployment/recreate_test.go b/pkg/controller/deployment/recreate_test.go index 87ebe875b21..cb0ac0080f7 100644 --- a/pkg/controller/deployment/recreate_test.go +++ b/pkg/controller/deployment/recreate_test.go @@ -21,8 +21,10 @@ import ( "testing" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/v1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" @@ -82,3 +84,62 @@ func TestScaleDownOldReplicaSets(t *testing.T) { } } } + +func TestOldPodsRunning(t *testing.T) { + tests := []struct { + name string + + newRS *extensions.ReplicaSet + oldRSs []*extensions.ReplicaSet + podMap map[types.UID]*v1.PodList + + expected bool + }{ + { + name: "no old RSs", + expected: false, + }, + { + name: "old RSs with running pods", + oldRSs: []*extensions.ReplicaSet{rsWithUID("some-uid"), rsWithUID("other-uid")}, + podMap: podMapWithUIDs([]string{"some-uid", "other-uid"}), + expected: true, + }, + { + name: "old RSs without pods but with non-zero status replicas", + oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 1, nil)}, + expected: true, + }, + { + name: "old RSs without pods or non-zero status replicas", + oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 0, nil)}, + expected: false, + }, + } + + for _, test := range tests { + if expected, got := test.expected, oldPodsRunning(test.newRS, test.oldRSs, test.podMap); expected != got { + t.Errorf("%s: expected %t, got %t", test.name, expected, got) + } + } +} + +func rsWithUID(uid string) *extensions.ReplicaSet { + d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"}) + rs := newReplicaSet(d, fmt.Sprintf("foo-%s", uid), 0) + rs.UID = types.UID(uid) + return rs +} + +func podMapWithUIDs(uids []string) map[types.UID]*v1.PodList { + podMap := make(map[types.UID]*v1.PodList) + for _, uid := range uids { + podMap[types.UID(uid)] = &v1.PodList{ + Items: []v1.Pod{ + { /* supposedly a pod */ }, + { /* supposedly another pod pod */ }, + }, + } + } + return podMap +} diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index bee9cebceff..bf22d808ec7 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -135,7 +135,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.D // rsList should come from getReplicaSetsForDeployment(d). // podMap should come from getPodMapForDeployment(d, rsList). func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, error) { - syncedRSList := []*extensions.ReplicaSet{} + var syncedRSList []*extensions.ReplicaSet for _, rs := range rsList { // Add pod-template-hash information if it's not in the RS. // Otherwise, new RS produced by Deployment will overlap with pre-existing ones @@ -515,7 +515,6 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe glog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name) var errList []error - // TODO: This should be parallelized. for i := int32(0); i < diff; i++ { rs := cleanableRSes[i] // Avoid delete replica set with non-zero replica counts diff --git a/pkg/controller/deployment/util/deployment_util.go b/pkg/controller/deployment/util/deployment_util.go index 490ea7a3398..3b5bcbb6d3a 100644 --- a/pkg/controller/deployment/util/deployment_util.go +++ b/pkg/controller/deployment/util/deployment_util.go @@ -570,7 +570,7 @@ func rsListFromClient(c clientset.Interface) rsListFunc { if err != nil { return nil, err } - ret := []*extensions.ReplicaSet{} + var ret []*extensions.ReplicaSet for i := range rsList.Items { ret = append(ret, &rsList.Items[i]) } @@ -827,9 +827,12 @@ func WaitForPodsHashPopulated(c extensionslisters.ReplicaSetLister, desiredGener } // LabelPodsWithHash labels all pods in the given podList with the new hash label. -// The returned bool value can be used to tell if all pods are actually labeled. func LabelPodsWithHash(podList *v1.PodList, c clientset.Interface, podLister corelisters.PodLister, namespace, name, hash string) error { for _, pod := range podList.Items { + // Ignore inactive Pods. + if !controller.IsPodActive(&pod) { + continue + } // Only label the pod that doesn't already have the new hash if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { _, err := UpdatePodWithRetries(c.Core().Pods(namespace), podLister, pod.Namespace, pod.Name, diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index bb6c2698726..8f3cb0d5686 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -69,7 +69,7 @@ var _ = framework.KubeDescribe("Deployment", func() { It("RollingUpdateDeployment should delete old pods and create new ones", func() { testRollingUpdateDeployment(f) }) - It("RecreateDeployment should delete old pods and create new ones [Flaky]", func() { + It("RecreateDeployment should delete old pods and create new ones", func() { testRecreateDeployment(f) }) It("deployment should delete old replica sets", func() {