Merge pull request #43963 from kargakis/fix-recreate-syncs

Automatic merge from submit-queue (batch tested with PRs 43963, 43965)

Wait for clean old RSs statuses in the middle of Recreate rollouts

After https://github.com/kubernetes/kubernetes/pull/43508 got merged, we started returning ReplicaSets with no pods but with stale statuses back to the rollout functions. As a consequence, one of our e2e tests that checks if a Recreate Deployment runs pods from different versions, started flakying because the Deployment status may be incorrect. This change simply waits for the statuses to get cleaned up before proceeding with scaling up the new RS.

Fixes https://github.com/kubernetes/kubernetes/issues/43864

@kubernetes/sig-apps-bugs
This commit is contained in:
Kubernetes Submit Queue 2017-04-06 02:52:18 -07:00 committed by GitHub
commit b4ff65ddf2
8 changed files with 104 additions and 27 deletions

View File

@ -71,6 +71,7 @@ go_test(
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/runtime/schema", "//vendor:k8s.io/apimachinery/pkg/runtime/schema",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/intstr", "//vendor:k8s.io/apimachinery/pkg/util/intstr",
"//vendor:k8s.io/apimachinery/pkg/util/uuid", "//vendor:k8s.io/apimachinery/pkg/util/uuid",
"//vendor:k8s.io/client-go/testing", "//vendor:k8s.io/client-go/testing",

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
@ -537,10 +538,8 @@ func (dc *DeploymentController) getPodMapForDeployment(d *extensions.Deployment,
podMap[rs.UID] = &v1.PodList{} podMap[rs.UID] = &v1.PodList{}
} }
for _, pod := range pods { for _, pod := range pods {
// Ignore inactive Pods since that's what ReplicaSet does. // Do not ignore inactive Pods because Recreate Deployments need to verify that no
if !controller.IsPodActive(pod) { // Pods from older versions are running before spinning up new Pods.
continue
}
controllerRef := controller.GetControllerOf(pod) controllerRef := controller.GetControllerOf(pod)
if controllerRef == nil { if controllerRef == nil {
continue continue
@ -614,6 +613,10 @@ func (dc *DeploymentController) syncDeployment(key string) error {
return err return err
} }
// List all Pods owned by this Deployment, grouped by their ReplicaSet. // List all Pods owned by this Deployment, grouped by their ReplicaSet.
// Current uses of the podMap are:
//
// * check if a Pod is labeled correctly with the pod-template-hash label.
// * check that no old Pods are running in the middle of Recreate Deployments.
podMap, err := dc.getPodMapForDeployment(d, rsList) podMap, err := dc.getPodMapForDeployment(d, rsList)
if err != nil { if err != nil {
return err return err

View File

@ -560,18 +560,21 @@ func TestGetPodMapForReplicaSets(t *testing.T) {
for _, podList := range podMap { for _, podList := range podMap {
podCount += len(podList.Items) podCount += len(podList.Items)
} }
if got, want := podCount, 2; got != want { if got, want := podCount, 3; got != want {
t.Errorf("podCount = %v, want %v", got, want) t.Errorf("podCount = %v, want %v", got, want)
} }
if got, want := len(podMap), 2; got != want { if got, want := len(podMap), 2; got != want {
t.Errorf("len(podMap) = %v, want %v", got, want) t.Errorf("len(podMap) = %v, want %v", got, want)
} }
if got, want := len(podMap[rs1.UID].Items), 1; got != want { if got, want := len(podMap[rs1.UID].Items), 2; got != want {
t.Errorf("len(podMap[rs1]) = %v, want %v", got, want) t.Errorf("len(podMap[rs1]) = %v, want %v", got, want)
} }
if got, want := podMap[rs1.UID].Items[0].Name, "rs1-pod"; got != want { expect := map[string]struct{}{"rs1-pod": {}, "pod4": {}}
t.Errorf("podMap[rs1] = [%v], want [%v]", got, want) for _, pod := range podMap[rs1.UID].Items {
if _, ok := expect[pod.Name]; !ok {
t.Errorf("unexpected pod name for rs1: %s", pod.Name)
}
} }
if got, want := len(podMap[rs2.UID].Items), 1; got != want { if got, want := len(podMap[rs2.UID].Items), 1; got != want {
t.Errorf("len(podMap[rs2]) = %v, want %v", got, want) t.Errorf("len(podMap[rs2]) = %v, want %v", got, want)

View File

@ -21,6 +21,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/deployment/util"
) )
// rolloutRecreate implements the logic for recreating a replica set. // rolloutRecreate implements the logic for recreating a replica set.
@ -43,18 +44,12 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList
return dc.syncRolloutStatus(allRSs, newRS, d) return dc.syncRolloutStatus(allRSs, newRS, d)
} }
newStatus := calculateStatus(allRSs, newRS, d)
// Do not process a deployment when it has old pods running. // Do not process a deployment when it has old pods running.
if newStatus.UpdatedReplicas == 0 { if oldPodsRunning(newRS, oldRSs, podMap) {
for _, podList := range podMap {
if len(podList.Items) > 0 {
return dc.syncRolloutStatus(allRSs, newRS, d) return dc.syncRolloutStatus(allRSs, newRS, d)
} }
}
}
// If we need to create a new RS, create it now. // If we need to create a new RS, create it now.
// TODO: Create a new RS without re-listing all RSs.
if newRS == nil { if newRS == nil {
newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true) newRS, oldRSs, err = dc.getAllReplicaSetsAndSyncRevision(d, rsList, podMap, true)
if err != nil { if err != nil {
@ -64,14 +59,9 @@ func (dc *DeploymentController) rolloutRecreate(d *extensions.Deployment, rsList
} }
// scale up new replica set. // scale up new replica set.
scaledUp, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d) if _, err := dc.scaleUpNewReplicaSetForRecreate(newRS, d); err != nil {
if err != nil {
return err return err
} }
if scaledUp {
// Update DeploymentStatus.
return dc.syncRolloutStatus(allRSs, newRS, d)
}
// Sync deployment status. // Sync deployment status.
return dc.syncRolloutStatus(allRSs, newRS, d) return dc.syncRolloutStatus(allRSs, newRS, d)
@ -98,6 +88,23 @@ func (dc *DeploymentController) scaleDownOldReplicaSetsForRecreate(oldRSs []*ext
return scaled, nil return scaled, nil
} }
// oldPodsRunning returns whether there are old pods running or any of the old ReplicaSets thinks that it runs pods.
func oldPodsRunning(newRS *extensions.ReplicaSet, oldRSs []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) bool {
if oldPods := util.GetActualReplicaCountForReplicaSets(oldRSs); oldPods > 0 {
return true
}
for rsUID, podList := range podMap {
// If the pods belong to the new ReplicaSet, ignore.
if newRS != nil && newRS.UID == rsUID {
continue
}
if len(podList.Items) > 0 {
return true
}
}
return false
}
// scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate". // scaleUpNewReplicaSetForRecreate scales up new replica set when deployment strategy is "Recreate".
func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) { func (dc *DeploymentController) scaleUpNewReplicaSetForRecreate(newRS *extensions.ReplicaSet, deployment *extensions.Deployment) (bool, error) {
scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment) scaled, _, err := dc.scaleReplicaSetAndRecordEvent(newRS, *(deployment.Spec.Replicas), deployment)

View File

@ -21,8 +21,10 @@ import (
"testing" "testing"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
@ -82,3 +84,62 @@ func TestScaleDownOldReplicaSets(t *testing.T) {
} }
} }
} }
func TestOldPodsRunning(t *testing.T) {
tests := []struct {
name string
newRS *extensions.ReplicaSet
oldRSs []*extensions.ReplicaSet
podMap map[types.UID]*v1.PodList
expected bool
}{
{
name: "no old RSs",
expected: false,
},
{
name: "old RSs with running pods",
oldRSs: []*extensions.ReplicaSet{rsWithUID("some-uid"), rsWithUID("other-uid")},
podMap: podMapWithUIDs([]string{"some-uid", "other-uid"}),
expected: true,
},
{
name: "old RSs without pods but with non-zero status replicas",
oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 1, nil)},
expected: true,
},
{
name: "old RSs without pods or non-zero status replicas",
oldRSs: []*extensions.ReplicaSet{newRSWithStatus("rs-blabla", 0, 0, nil)},
expected: false,
},
}
for _, test := range tests {
if expected, got := test.expected, oldPodsRunning(test.newRS, test.oldRSs, test.podMap); expected != got {
t.Errorf("%s: expected %t, got %t", test.name, expected, got)
}
}
}
func rsWithUID(uid string) *extensions.ReplicaSet {
d := newDeployment("foo", 1, nil, nil, nil, map[string]string{"foo": "bar"})
rs := newReplicaSet(d, fmt.Sprintf("foo-%s", uid), 0)
rs.UID = types.UID(uid)
return rs
}
func podMapWithUIDs(uids []string) map[types.UID]*v1.PodList {
podMap := make(map[types.UID]*v1.PodList)
for _, uid := range uids {
podMap[types.UID(uid)] = &v1.PodList{
Items: []v1.Pod{
{ /* supposedly a pod */ },
{ /* supposedly another pod pod */ },
},
}
}
return podMap
}

View File

@ -135,7 +135,7 @@ func (dc *DeploymentController) getAllReplicaSetsAndSyncRevision(d *extensions.D
// rsList should come from getReplicaSetsForDeployment(d). // rsList should come from getReplicaSetsForDeployment(d).
// podMap should come from getPodMapForDeployment(d, rsList). // podMap should come from getPodMapForDeployment(d, rsList).
func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, error) { func (dc *DeploymentController) rsAndPodsWithHashKeySynced(d *extensions.Deployment, rsList []*extensions.ReplicaSet, podMap map[types.UID]*v1.PodList) ([]*extensions.ReplicaSet, error) {
syncedRSList := []*extensions.ReplicaSet{} var syncedRSList []*extensions.ReplicaSet
for _, rs := range rsList { for _, rs := range rsList {
// Add pod-template-hash information if it's not in the RS. // Add pod-template-hash information if it's not in the RS.
// Otherwise, new RS produced by Deployment will overlap with pre-existing ones // Otherwise, new RS produced by Deployment will overlap with pre-existing ones
@ -515,7 +515,6 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*extensions.ReplicaSe
glog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name) glog.V(4).Infof("Looking to cleanup old replica sets for deployment %q", deployment.Name)
var errList []error var errList []error
// TODO: This should be parallelized.
for i := int32(0); i < diff; i++ { for i := int32(0); i < diff; i++ {
rs := cleanableRSes[i] rs := cleanableRSes[i]
// Avoid delete replica set with non-zero replica counts // Avoid delete replica set with non-zero replica counts

View File

@ -570,7 +570,7 @@ func rsListFromClient(c clientset.Interface) rsListFunc {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ret := []*extensions.ReplicaSet{} var ret []*extensions.ReplicaSet
for i := range rsList.Items { for i := range rsList.Items {
ret = append(ret, &rsList.Items[i]) ret = append(ret, &rsList.Items[i])
} }
@ -827,9 +827,12 @@ func WaitForPodsHashPopulated(c extensionslisters.ReplicaSetLister, desiredGener
} }
// LabelPodsWithHash labels all pods in the given podList with the new hash label. // LabelPodsWithHash labels all pods in the given podList with the new hash label.
// The returned bool value can be used to tell if all pods are actually labeled.
func LabelPodsWithHash(podList *v1.PodList, c clientset.Interface, podLister corelisters.PodLister, namespace, name, hash string) error { func LabelPodsWithHash(podList *v1.PodList, c clientset.Interface, podLister corelisters.PodLister, namespace, name, hash string) error {
for _, pod := range podList.Items { for _, pod := range podList.Items {
// Ignore inactive Pods.
if !controller.IsPodActive(&pod) {
continue
}
// Only label the pod that doesn't already have the new hash // Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
_, err := UpdatePodWithRetries(c.Core().Pods(namespace), podLister, pod.Namespace, pod.Name, _, err := UpdatePodWithRetries(c.Core().Pods(namespace), podLister, pod.Namespace, pod.Name,

View File

@ -69,7 +69,7 @@ var _ = framework.KubeDescribe("Deployment", func() {
It("RollingUpdateDeployment should delete old pods and create new ones", func() { It("RollingUpdateDeployment should delete old pods and create new ones", func() {
testRollingUpdateDeployment(f) testRollingUpdateDeployment(f)
}) })
It("RecreateDeployment should delete old pods and create new ones [Flaky]", func() { It("RecreateDeployment should delete old pods and create new ones", func() {
testRecreateDeployment(f) testRecreateDeployment(f)
}) })
It("deployment should delete old replica sets", func() { It("deployment should delete old replica sets", func() {