From da58172283ee38316373e7197e5e91d8705556cf Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 10 Feb 2016 17:49:11 -0800 Subject: [PATCH 1/6] Ensure Deployment labels adopted ReplicaSets and pods --- .../deployment/deployment_controller.go | 4 + pkg/util/deployment/deployment.go | 152 ++++++++++++++++-- pkg/util/labels/labels.go | 40 +++++ 3 files changed, 186 insertions(+), 10 deletions(-) diff --git a/pkg/controller/deployment/deployment_controller.go b/pkg/controller/deployment/deployment_controller.go index b1eaa67897f..060dfc8a44f 100644 --- a/pkg/controller/deployment/deployment_controller.go +++ b/pkg/controller/deployment/deployment_controller.go @@ -665,6 +665,10 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen newRevision := strconv.FormatInt(maxOldRevision+1, 10) existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector) + return &podList, err + }, func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector) }) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 5f5baa782e3..e3359a4b7bd 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -25,11 +25,13 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" podutil "k8s.io/kubernetes/pkg/util/pod" + "k8s.io/kubernetes/pkg/util/wait" ) const ( @@ -55,9 +57,12 @@ func GetOldReplicaSets(deployment extensions.Deployment, c clientset.Interface) }) } +type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error) +type podListFunc func(string, api.ListOptions) (*api.PodList, error) + // GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. -func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { +func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { namespace := deployment.ObjectMeta.Namespace selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { @@ -74,7 +79,7 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In // TODO: Right now we list all replica sets and then filter. We should add an API for this. oldRSs := map[string]extensions.ReplicaSet{} allOldRSs := map[string]extensions.ReplicaSet{} - rsList, err := getRSList(namespace, options) + rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList) if err != nil { return nil, nil, fmt.Errorf("error listing replica sets: %v", err) } @@ -113,6 +118,9 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In // Returns nil if the new replica set doesn't exist yet. func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) { return GetNewReplicaSetFromList(deployment, c, + func(namespace string, options api.ListOptions) (*api.PodList, error) { + return c.Core().Pods(namespace).List(options) + }, func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) { rsList, err := c.Extensions().ReplicaSets(namespace).List(options) return rsList.Items, err @@ -121,14 +129,8 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) ( // GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function. // Returns nil if the new replica set doesn't exist yet. -func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) (*extensions.ReplicaSet, error) { - namespace := deployment.ObjectMeta.Namespace - selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) - if err != nil { - return nil, fmt.Errorf("invalid label selector: %v", err) - } - - rsList, err := getRSList(namespace, api.ListOptions{LabelSelector: selector}) +func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) { + rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList) if err != nil { return nil, fmt.Errorf("error listing ReplicaSets: %v", err) } @@ -144,6 +146,136 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte return nil, nil } +// rsListWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced. +func rsListWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, error) { + namespace := deployment.Namespace + selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) + if err != nil { + return nil, err + } + options := api.ListOptions{LabelSelector: selector} + rsList, err := getRSList(namespace, options) + if err != nil { + return nil, err + } + syncedRSList := []extensions.ReplicaSet{} + for _, rs := range rsList { + // Add pod-template-hash information if it's not in the rs + if !labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { + updatedRS, err := addHashKeyToReplicaSet(deployment, c, rs, getPodList) + if err != nil { + return nil, err + } + syncedRSList = append(syncedRSList, *updatedRS) + } + syncedRSList = append(syncedRSList, rs) + } + return syncedRSList, nil +} + +// addHashKeyToReplicaSet adds pod-template-hash information to the given rs with the following steps: +// 1. Add hash label to the rs's pod template +// 2. Add hash label to all pods this rs owns +// 3. Add hash label to the rs's selector +// 4. Clean up all pods this rs owns but without the hash label (orphaned pods) +func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) { + if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { + return &rs, nil + } + namespace := deployment.Namespace + hash := podutil.GetPodTemplateSpecHash(*rs.Spec.Template) + // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. + updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + }) + if err != nil { + return nil, err + } + + // 2. Update all pods managed by the rs to have the new hash label, so they are correctly adopted. + selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) + if err != nil { + return nil, err + } + options := api.ListOptions{LabelSelector: selector} + podList, err := getPodList(namespace, options) + if err != nil { + return nil, err + } + for _, pod := range podList.Items { + pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + delay, maxRetries := 3, 3 + for i := 0; i < maxRetries; i++ { + _, err = c.Core().Pods(namespace).Update(&pod) + if err != nil { + time.Sleep(time.Second * time.Duration(delay)) + delay *= delay + } else { + break + } + } + if err != nil { + return nil, err + } + } + + // 3. Update rs selector + // Copy the old selector, so that we can scrub out any orphaned pods + oldSelector := updatedRS.Spec.Selector + // Update the selector of the rs so it manages all the pods we updated above + if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + }); err != nil { + return nil, err + } + + // 4. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager + // doesn't see the update to its pod template and creates a new pod with the old labels after + // we've finished re-adopting existing pods to the rs. + selector, err = unversioned.LabelSelectorAsSelector(oldSelector) + if err != nil { + return nil, err + } + options = api.ListOptions{LabelSelector: selector} + podList, err = getPodList(namespace, options) + hashString := fmt.Sprintf("%d", hash) + for _, pod := range podList.Items { + if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hashString { + if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil { + return nil, err + } + } + } + + return updatedRS, nil +} + +type updateFunc func(rs *extensions.ReplicaSet) + +func updateRSWithRetries(rsClient extensions_unversioned.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) { + var err error + oldRs := rs + err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(rs) + if rs, err = rsClient.Update(rs); err == nil { + // rs contains the latest controller post update + return true, nil + } + // Update the controller with the latest resource version, if the update failed we + // can't trust rs so use oldRs.Name. + if rs, err = rsClient.Get(oldRs.Name); err != nil { + // The Get failed: Value in rs cannot be trusted. + rs = oldRs + } + // The Get passed: rs contains the latest controller, expect a poll for the update. + return false, nil + }) + // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned + // controller contains the applied update. + return rs, err +} + // Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec { // newRS will have the same template as in deployment spec, plus a unique label in some cases. diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go index c32b862cd49..068b5fa89bb 100644 --- a/pkg/util/labels/labels.go +++ b/pkg/util/labels/labels.go @@ -54,6 +54,19 @@ func CloneAndRemoveLabel(labels map[string]string, labelKey string) map[string]s return newLabels } +// AddLabel returns a map with the given key and value added to the given map. +func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[string]string { + if labelKey == "" { + // Dont need to add a label. + return labels + } + if labels == nil { + labels = make(map[string]string) + } + labels[labelKey] = fmt.Sprintf("%d", labelValue) + return labels +} + // Clones the given selector and returns a new selector with the given key and value added. // Returns the given selector, if labelKey is empty. func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector { @@ -93,3 +106,30 @@ func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey stri return newSelector } + +// AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels. +func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector { + if labelKey == "" { + // Dont need to add a label. + return selector + } + if selector.MatchLabels == nil { + selector.MatchLabels = make(map[string]string) + } + selector.MatchLabels[labelKey] = fmt.Sprintf("%d", labelValue) + return selector +} + +// SelectorHasLabel checks if the given selector contains the given label key in its MatchLabels or MatchExpressions +func SelectorHasLabel(selector *unversioned.LabelSelector, labelKey string) bool { + _, found := selector.MatchLabels[labelKey] + if found { + return true + } + for _, exp := range selector.MatchExpressions { + if exp.Key == labelKey && exp.Operator != unversioned.LabelSelectorOpDoesNotExist { + return true + } + } + return false +} From 11fdbff97f37276889594682805824afd8a04e15 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 11 Feb 2016 10:57:42 -0800 Subject: [PATCH 2/6] Address comments; fix test failures; add e2e tests; update RS's label too --- pkg/util/deployment/deployment.go | 95 +++++++++++--------------- pkg/util/deployment/deployment_test.go | 49 ++++++++++++- pkg/util/labels/labels.go | 8 +-- test/e2e/deployment.go | 82 ++++++++++++++++++++++ 4 files changed, 171 insertions(+), 63 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index e3359a4b7bd..a3c776ef17b 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -63,25 +63,14 @@ type podListFunc func(string, api.ListOptions) (*api.PodList, error) // GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions. // Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets. func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) { - namespace := deployment.ObjectMeta.Namespace - selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) - if err != nil { - return nil, nil, fmt.Errorf("invalid label selector: %v", err) - } - - // 1. Find all pods whose labels match deployment.Spec.Selector - options := api.ListOptions{LabelSelector: selector} - podList, err := getPodList(namespace, options) - if err != nil { - return nil, nil, fmt.Errorf("error listing pods: %v", err) - } - // 2. Find the corresponding replica sets for pods in podList. + // Find all pods whose labels match deployment.Spec.Selector, and corresponding replica sets for pods in podList. + // All pods and replica sets are labeled with pod-template-hash to prevent overlapping // TODO: Right now we list all replica sets and then filter. We should add an API for this. oldRSs := map[string]extensions.ReplicaSet{} allOldRSs := map[string]extensions.ReplicaSet{} - rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList) + rsList, podList, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList) if err != nil { - return nil, nil, fmt.Errorf("error listing replica sets: %v", err) + return nil, nil, fmt.Errorf("error labeling replica sets and pods with pod-template-hash: %v", err) } newRSTemplate := GetNewReplicaSetTemplate(deployment) for _, pod := range podList.Items { @@ -130,7 +119,7 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) ( // GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function. // Returns nil if the new replica set doesn't exist yet. func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) { - rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList) + rsList, _, err := rsAndPodsWithHashKeySynced(deployment, c, getRSList, getPodList) if err != nil { return nil, fmt.Errorf("error listing ReplicaSets: %v", err) } @@ -146,54 +135,45 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte return nil, nil } -// rsListWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced. -func rsListWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, error) { +// rsAndPodsWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced. +func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) { namespace := deployment.Namespace selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) if err != nil { - return nil, err + return nil, nil, err } options := api.ListOptions{LabelSelector: selector} rsList, err := getRSList(namespace, options) if err != nil { - return nil, err + return nil, nil, err } syncedRSList := []extensions.ReplicaSet{} for _, rs := range rsList { - // Add pod-template-hash information if it's not in the rs - if !labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { - updatedRS, err := addHashKeyToReplicaSet(deployment, c, rs, getPodList) - if err != nil { - return nil, err - } - syncedRSList = append(syncedRSList, *updatedRS) + // Add pod-template-hash information if it's not in the RS. + // Otherwise, new RS produced by Deployment will overlap we pre-existing ones + // that aren't constrained by the pod-template-hash. + syncedRS, err := addHashKeyToRSAndPods(deployment, c, rs, getPodList) + if err != nil { + return nil, nil, err } - syncedRSList = append(syncedRSList, rs) + syncedRSList = append(syncedRSList, *syncedRS) } - return syncedRSList, nil + syncedPodList, err := getPodList(namespace, options) + return syncedRSList, syncedPodList, nil } -// addHashKeyToReplicaSet adds pod-template-hash information to the given rs with the following steps: -// 1. Add hash label to the rs's pod template -// 2. Add hash label to all pods this rs owns -// 3. Add hash label to the rs's selector -// 4. Clean up all pods this rs owns but without the hash label (orphaned pods) -func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) { +// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: +// 1. Add hash label to all pods this rs owns +// 2. Add hash label to the rs's pod template, the rs's label, and the rs's selector +// 3. Clean up all pods this rs owns but without the hash label (orphaned pods) +func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) { if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { return &rs, nil } namespace := deployment.Namespace - hash := podutil.GetPodTemplateSpecHash(*rs.Spec.Template) - // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. - updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { - updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - }) - if err != nil { - return nil, err - } - - // 2. Update all pods managed by the rs to have the new hash label, so they are correctly adopted. - selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) + hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(*rs.Spec.Template)) + // 1. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. + selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { return nil, err } @@ -207,29 +187,31 @@ func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interf delay, maxRetries := 3, 3 for i := 0; i < maxRetries; i++ { _, err = c.Core().Pods(namespace).Update(&pod) - if err != nil { - time.Sleep(time.Second * time.Duration(delay)) - delay *= delay - } else { + if err == nil { break } + time.Sleep(time.Second * time.Duration(delay)) + delay *= delay } if err != nil { return nil, err } } - // 3. Update rs selector + // 2. Update rs label, rs template label, and rs selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods - oldSelector := updatedRS.Spec.Selector + oldSelector := rs.Spec.Selector // Update the selector of the rs so it manages all the pods we updated above - if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { + updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { + updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) - }); err != nil { + }) + if err != nil { return nil, err } - // 4. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager + // 3. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager // doesn't see the update to its pod template and creates a new pod with the old labels after // we've finished re-adopting existing pods to the rs. selector, err = unversioned.LabelSelectorAsSelector(oldSelector) @@ -238,9 +220,8 @@ func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interf } options = api.ListOptions{LabelSelector: selector} podList, err = getPodList(namespace, options) - hashString := fmt.Sprintf("%d", hash) for _, pod := range podList.Items { - if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hashString { + if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hash { if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil { return nil, err } diff --git a/pkg/util/deployment/deployment_test.go b/pkg/util/deployment/deployment_test.go index 4383bda02da..cab72222166 100644 --- a/pkg/util/deployment/deployment_test.go +++ b/pkg/util/deployment/deployment_test.go @@ -26,10 +26,43 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/client/testing/fake" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" "k8s.io/kubernetes/pkg/client/unversioned/testclient/simple" "k8s.io/kubernetes/pkg/runtime" ) +func addListRSReactor(fakeClient *fake.Clientset, obj runtime.Object) *fake.Clientset { + fakeClient.AddReactor("list", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, obj, nil + }) + return fakeClient +} + +func addListPodsReactor(fakeClient *fake.Clientset, obj runtime.Object) *fake.Clientset { + fakeClient.AddReactor("list", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + return true, obj, nil + }) + return fakeClient +} + +func addUpdateRSReactor(fakeClient *fake.Clientset) *fake.Clientset { + fakeClient.AddReactor("update", "replicasets", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(testclient.UpdateAction).GetObject().(*extensions.ReplicaSet) + return true, obj, nil + }) + return fakeClient +} + +func addUpdatePodsReactor(fakeClient *fake.Clientset) *fake.Clientset { + fakeClient.AddReactor("update", "pods", func(action core.Action) (handled bool, ret runtime.Object, err error) { + obj := action.(testclient.UpdateAction).GetObject().(*api.Pod) + return true, obj, nil + }) + return fakeClient +} + func newPod(now time.Time, ready bool, beforeSec int) api.Pod { conditionStatus := api.ConditionFalse if ready { @@ -313,12 +346,24 @@ func TestGetOldRCs(t *testing.T) { } for _, test := range tests { - rss, _, err := GetOldReplicaSets(newDeployment, fake.NewSimpleClientset(test.objs...)) + fakeClient := &fake.Clientset{} + fakeClient = addListPodsReactor(fakeClient, test.objs[0]) + fakeClient = addListRSReactor(fakeClient, test.objs[1]) + fakeClient = addUpdatePodsReactor(fakeClient) + fakeClient = addUpdateRSReactor(fakeClient) + rss, _, err := GetOldReplicaSets(newDeployment, fakeClient) if err != nil { t.Errorf("In test case %s, got unexpected error %v", test.test, err) } if !equal(rss, test.expected) { - t.Errorf("In test case %q, expected %v, got %v", test.test, test.expected, rss) + t.Errorf("In test case %q, expected:", test.test) + for _, rs := range test.expected { + t.Errorf("rs = %+v", rs) + } + t.Errorf("In test case %q, got:", test.test) + for _, rs := range rss { + t.Errorf("rs = %+v", rs) + } } } } diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go index 068b5fa89bb..b4e3f6b89b8 100644 --- a/pkg/util/labels/labels.go +++ b/pkg/util/labels/labels.go @@ -55,7 +55,7 @@ func CloneAndRemoveLabel(labels map[string]string, labelKey string) map[string]s } // AddLabel returns a map with the given key and value added to the given map. -func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[string]string { +func AddLabel(labels map[string]string, labelKey string, labelValue string) map[string]string { if labelKey == "" { // Dont need to add a label. return labels @@ -63,7 +63,7 @@ func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[ if labels == nil { labels = make(map[string]string) } - labels[labelKey] = fmt.Sprintf("%d", labelValue) + labels[labelKey] = labelValue return labels } @@ -108,7 +108,7 @@ func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey stri } // AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels. -func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector { +func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue string) *unversioned.LabelSelector { if labelKey == "" { // Dont need to add a label. return selector @@ -116,7 +116,7 @@ func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, la if selector.MatchLabels == nil { selector.MatchLabels = make(map[string]string) } - selector.MatchLabels[labelKey] = fmt.Sprintf("%d", labelValue) + selector.MatchLabels[labelKey] = labelValue return selector } diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 8c149b97a45..3d280db4f6b 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -66,6 +66,9 @@ var _ = Describe("Deployment", func() { It("[Flaky] deployment should support rollback when there's replica set with no revision", func() { testRollbackDeploymentRSNoRevision(f) }) + It("deployment should label adopted RSs and pods", func() { + testDeploymentLabelAdopted(f) + }) }) func newRS(rsName string, replicas int, rsPodLabels map[string]string, imageName string, image string) *extensions.ReplicaSet { @@ -272,6 +275,17 @@ func testRollingUpdateDeployment(f *Framework) { // Check if it's updated to revision 1 correctly checkDeploymentRevision(c, ns, deploymentName, "1", "redis", "redis") + + // There should be 1 old RS (nginx-controller, which is adopted) + deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) + _, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(len(allOldRSs)).Should(Equal(1)) + // The old RS should contain pod-template-hash in its selector, label, and template label + Expect(len(allOldRSs[0].Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) + Expect(len(allOldRSs[0].Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) + Expect(len(allOldRSs[0].Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) } func testRollingUpdateDeploymentEvents(f *Framework) { @@ -797,3 +811,71 @@ func testRollbackDeploymentRSNoRevision(f *Framework) { // Check if it's still revision 3 checkDeploymentRevision(c, ns, deploymentName, "3", deploymentImageName, deploymentImage) } + +func testDeploymentLabelAdopted(f *Framework) { + ns := f.Namespace.Name + // TODO: remove unversionedClient when the refactoring is done. Currently some + // functions like verifyPod still expects a unversioned#Client. + unversionedClient := f.Client + c := clientset.FromUnversionedClient(unversionedClient) + // Create nginx pods. + podName := "nginx" + podLabels := map[string]string{"name": podName} + + rsName := "nginx-controller" + replicas := 3 + _, err := c.Extensions().ReplicaSets(ns).Create(newRS(rsName, replicas, podLabels, podName, podName)) + Expect(err).NotTo(HaveOccurred()) + // Verify that the required pods have come up. + err = verifyPods(unversionedClient, ns, podName, false, 3) + if err != nil { + Logf("error in waiting for pods to come up: %s", err) + Expect(err).NotTo(HaveOccurred()) + } + + // Create a nginx deployment to adopt the old rs. + deploymentName := "nginx-deployment" + Logf("Creating deployment %s", deploymentName) + _, err = c.Extensions().Deployments(ns).Create(newDeployment(deploymentName, replicas, podLabels, podName, podName, extensions.RollingUpdateDeploymentStrategyType, nil)) + Expect(err).NotTo(HaveOccurred()) + defer func() { + deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) + Logf("deleting deployment %s", deploymentName) + Expect(c.Extensions().Deployments(ns).Delete(deploymentName, nil)).NotTo(HaveOccurred()) + // TODO: remove this once we can delete replica sets with deployment + newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(c.Extensions().ReplicaSets(ns).Delete(newRS.Name, nil)).NotTo(HaveOccurred()) + }() + + err = waitForDeploymentStatus(c, ns, deploymentName, replicas, replicas-1, replicas+1, 0) + Expect(err).NotTo(HaveOccurred()) + + // Check if it's updated to revision 1 correctly + checkDeploymentRevision(c, ns, deploymentName, "1", "nginx", "nginx") + + // There should be no old RSs (overlapping RS) + deployment, err := c.Extensions().Deployments(ns).Get(deploymentName) + Expect(err).NotTo(HaveOccurred()) + oldRSs, allOldRSs, err := deploymentutil.GetOldReplicaSets(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(len(oldRSs)).Should(Equal(0)) + Expect(len(allOldRSs)).Should(Equal(0)) + // New RS should contain pod-template-hash in its selector, label, and template label + newRS, err := deploymentutil.GetNewReplicaSet(*deployment, c) + Expect(err).NotTo(HaveOccurred()) + Expect(len(newRS.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) + Expect(len(newRS.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) + Expect(len(newRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) + // All pods targeted by the deployment should contain pod-template-hash in their labels, and there should be only 3 pods + selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) + Expect(err).NotTo(HaveOccurred()) + options := api.ListOptions{LabelSelector: selector} + pods, err := c.Core().Pods(ns).List(options) + Expect(err).NotTo(HaveOccurred()) + for _, pod := range pods.Items { + Expect(len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey])).Should(BeNumerically(">", 0)) + } + Expect(len(pods.Items)).Should(Equal(replicas)) +} From 14bab2bb3a1cde7390394d39509fb78026e6db4c Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 17 Feb 2016 12:54:21 -0800 Subject: [PATCH 3/6] Address comments --- pkg/util/deployment/deployment.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index a3c776ef17b..12f4ac4c3a2 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -25,7 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" @@ -183,8 +183,13 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa return nil, err } for _, pod := range podList.Items { + // If the pod already has the new hash label, avoid re-labeling it + if len(pod.Labels) > 0 && len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) > 0 { + continue + } pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) delay, maxRetries := 3, 3 + podName := pod.Name for i := 0; i < maxRetries; i++ { _, err = c.Core().Pods(namespace).Update(&pod) if err == nil { @@ -192,6 +197,11 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa } time.Sleep(time.Second * time.Duration(delay)) delay *= delay + getPod, err := c.Core().Pods(namespace).Get(podName) + if err != nil { + return nil, err + } + pod = *getPod } if err != nil { return nil, err @@ -233,7 +243,7 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa type updateFunc func(rs *extensions.ReplicaSet) -func updateRSWithRetries(rsClient extensions_unversioned.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) { +func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) { var err error oldRs := rs err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { From 0e5da8460d9f68c8beedc4a045b4c3917cf045b5 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 18 Feb 2016 11:45:24 -0800 Subject: [PATCH 4/6] Address comments --- pkg/util/deployment/deployment.go | 119 ++++++++++++++++++------------ 1 file changed, 70 insertions(+), 49 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 12f4ac4c3a2..4e1e3a1e281 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/util/integer" @@ -163,16 +164,25 @@ func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.In } // addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: -// 1. Add hash label to all pods this rs owns -// 2. Add hash label to the rs's pod template, the rs's label, and the rs's selector -// 3. Clean up all pods this rs owns but without the hash label (orphaned pods) +// 1. Add hash label to the rs's pod template +// 2. Add hash label to all pods this rs owns +// 3. Add hash label to the rs's label and selector +// 4. Add hash label to all pods this rs owns but without the hash label (orphaned pods) func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) { if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { return &rs, nil } namespace := deployment.Namespace hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(*rs.Spec.Template)) - // 1. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. + // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. + updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + }) + if err != nil { + return nil, err + } + + // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector) if err != nil { return nil, err @@ -182,46 +192,22 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa if err != nil { return nil, err } - for _, pod := range podList.Items { - // If the pod already has the new hash label, avoid re-labeling it - if len(pod.Labels) > 0 && len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) > 0 { - continue - } - pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - delay, maxRetries := 3, 3 - podName := pod.Name - for i := 0; i < maxRetries; i++ { - _, err = c.Core().Pods(namespace).Update(&pod) - if err == nil { - break - } - time.Sleep(time.Second * time.Duration(delay)) - delay *= delay - getPod, err := c.Core().Pods(namespace).Get(podName) - if err != nil { - return nil, err - } - pod = *getPod - } - if err != nil { - return nil, err - } - } - - // 2. Update rs label, rs template label, and rs selector to include the new hash label - // Copy the old selector, so that we can scrub out any orphaned pods - oldSelector := rs.Spec.Selector - // Update the selector of the rs so it manages all the pods we updated above - updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { - updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) - }) - if err != nil { + if err = labelPodsWithHash(podList, c, namespace, hash); err != nil { return nil, err } - // 3. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager + // 3. Update rs label, rs template label, and rs selector to include the new hash label + // Copy the old selector, so that we can scrub out any orphaned pods + oldSelector := rs.Spec.Selector + // Update the selector of the rs so it manages all the pods we updated above + if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { + updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + }); err != nil { + return nil, err + } + + // 4. Label any orphaned pods that don't have the new label, this can happen if the rs manager // doesn't see the update to its pod template and creates a new pod with the old labels after // we've finished re-adopting existing pods to the rs. selector, err = unversioned.LabelSelectorAsSelector(oldSelector) @@ -230,20 +216,36 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa } options = api.ListOptions{LabelSelector: selector} podList, err = getPodList(namespace, options) - for _, pod := range podList.Items { - if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hash { - if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil { - return nil, err - } - } + if err != nil { + return nil, err + } + if err = labelPodsWithHash(podList, c, namespace, hash); err != nil { + return nil, err } return updatedRS, nil } -type updateFunc func(rs *extensions.ReplicaSet) +// labelPodsWithHash labels all pods in the given podList with the new hash label. +func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error { + for _, pod := range podList.Items { + // Only label the pod that doesn't already have the new hash + if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { + if _, err := updatePodWithRetries(c.Core().Pods(namespace), &pod, func(updated *api.Pod) { + pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + }); err != nil { + return err + } + } + } + return nil +} -func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) { +// TODO: use client library instead when it starts to support update retries +// see https://github.com/kubernetes/kubernetes/issues/21479 +type updateRSFunc func(rs *extensions.ReplicaSet) + +func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) { var err error oldRs := rs err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { @@ -267,6 +269,25 @@ func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs return rs, err } +type updatePodFunc func(pod *api.Pod) + +func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) { + var err error + oldPod := pod + err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + // Apply the update, then attempt to push it to the apiserver. + applyUpdate(pod) + if pod, err = podClient.Update(pod); err == nil { + return true, nil + } + if pod, err = podClient.Get(oldPod.Name); err != nil { + pod = oldPod + } + return false, nil + }) + return pod, err +} + // Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet. func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec { // newRS will have the same template as in deployment spec, plus a unique label in some cases. From dc78af948e492d2c6c3076b375c0bd8bc0a6e233 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Fri, 19 Feb 2016 10:25:34 -0800 Subject: [PATCH 5/6] Address comments; fix incorrect hash --- pkg/util/deployment/deployment.go | 69 +++++++++++++++++-------------- pkg/util/labels/labels.go | 13 +----- 2 files changed, 41 insertions(+), 41 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 4e1e3a1e281..1b8d8439501 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -58,6 +58,7 @@ func GetOldReplicaSets(deployment extensions.Deployment, c clientset.Interface) }) } +// TODO: switch this to full namespacers type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error) type podListFunc func(string, api.ListOptions) (*api.PodList, error) @@ -136,7 +137,7 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte return nil, nil } -// rsAndPodsWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced. +// rsAndPodsWithHashKeySynced returns the RSs and pods the given deployment targets, with pod-template-hash information synced. func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, *api.PodList, error) { namespace := deployment.Namespace selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector) @@ -151,7 +152,7 @@ func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.In syncedRSList := []extensions.ReplicaSet{} for _, rs := range rsList { // Add pod-template-hash information if it's not in the RS. - // Otherwise, new RS produced by Deployment will overlap we pre-existing ones + // Otherwise, new RS produced by Deployment will overlap with pre-existing ones // that aren't constrained by the pod-template-hash. syncedRS, err := addHashKeyToRSAndPods(deployment, c, rs, getPodList) if err != nil { @@ -160,26 +161,40 @@ func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.In syncedRSList = append(syncedRSList, *syncedRS) } syncedPodList, err := getPodList(namespace, options) + if err != nil { + return nil, nil, err + } return syncedRSList, syncedPodList, nil } // addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps: -// 1. Add hash label to the rs's pod template +// 1. Add hash label to the rs's pod template, and make sure the controller sees this update so that no orphaned pods will be created // 2. Add hash label to all pods this rs owns // 3. Add hash label to the rs's label and selector -// 4. Add hash label to all pods this rs owns but without the hash label (orphaned pods) -func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) { +func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (updatedRS *extensions.ReplicaSet, err error) { + // If the rs already has the new hash label in its selector, it's done syncing + namespace := deployment.Namespace + hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(api.PodTemplateSpec{ + ObjectMeta: rs.Spec.Template.ObjectMeta, + Spec: rs.Spec.Template.Spec, + })) if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) { return &rs, nil } - namespace := deployment.Namespace - hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(*rs.Spec.Template)) // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. - updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { - updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - }) - if err != nil { - return nil, err + if len(rs.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { + updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + }) + if err != nil { + return nil, err + } + } + // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). + if updatedRS.Generation != updatedRS.Status.ObservedGeneration { + if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, rs.Name); err != nil { + return nil, err + } } // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. @@ -196,10 +211,8 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa return nil, err } - // 3. Update rs label, rs template label, and rs selector to include the new hash label + // 3. Update rs label and selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods - oldSelector := rs.Spec.Selector - // Update the selector of the rs so it manages all the pods we updated above if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) { updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) @@ -207,25 +220,21 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa return nil, err } - // 4. Label any orphaned pods that don't have the new label, this can happen if the rs manager - // doesn't see the update to its pod template and creates a new pod with the old labels after - // we've finished re-adopting existing pods to the rs. - selector, err = unversioned.LabelSelectorAsSelector(oldSelector) - if err != nil { - return nil, err - } - options = api.ListOptions{LabelSelector: selector} - podList, err = getPodList(namespace, options) - if err != nil { - return nil, err - } - if err = labelPodsWithHash(podList, c, namespace, hash); err != nil { - return nil, err - } + // TODO: look for orphaned pods and label them in the background somewhere else periodically return updatedRS, nil } +func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, namespace, name string) error { + return wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) { + rs, err := c.Extensions().ReplicaSets(namespace).Get(name) + if err != nil { + return false, err + } + return rs.Status.ObservedGeneration >= desiredGeneration, nil + }) +} + // labelPodsWithHash labels all pods in the given podList with the new hash label. func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error { for _, pod := range podList.Items { diff --git a/pkg/util/labels/labels.go b/pkg/util/labels/labels.go index b4e3f6b89b8..2160e37299b 100644 --- a/pkg/util/labels/labels.go +++ b/pkg/util/labels/labels.go @@ -120,16 +120,7 @@ func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, la return selector } -// SelectorHasLabel checks if the given selector contains the given label key in its MatchLabels or MatchExpressions +// SelectorHasLabel checks if the given selector contains the given label key in its MatchLabels func SelectorHasLabel(selector *unversioned.LabelSelector, labelKey string) bool { - _, found := selector.MatchLabels[labelKey] - if found { - return true - } - for _, exp := range selector.MatchExpressions { - if exp.Key == labelKey && exp.Operator != unversioned.LabelSelectorOpDoesNotExist { - return true - } - } - return false + return len(selector.MatchLabels[labelKey]) > 0 } From 4699a6d8a2c3d12f9efb821772f4201b32887985 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Mon, 22 Feb 2016 10:33:59 -0800 Subject: [PATCH 6/6] Address comments; fix test failure --- pkg/util/deployment/deployment.go | 2 +- pkg/util/deployment/deployment_test.go | 51 ++++++++++++-------------- test/e2e/deployment.go | 1 + 3 files changed, 26 insertions(+), 28 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 1b8d8439501..49b51827082 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -191,7 +191,7 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa } } // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). - if updatedRS.Generation != updatedRS.Status.ObservedGeneration { + if updatedRS.Generation > updatedRS.Status.ObservedGeneration { if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, rs.Name); err != nil { return nil, err } diff --git a/pkg/util/deployment/deployment_test.go b/pkg/util/deployment/deployment_test.go index cab72222166..9562f053ed7 100644 --- a/pkg/util/deployment/deployment_test.go +++ b/pkg/util/deployment/deployment_test.go @@ -22,14 +22,11 @@ import ( "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/apis/extensions" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/testing/core" - "k8s.io/kubernetes/pkg/client/testing/fake" "k8s.io/kubernetes/pkg/client/unversioned/testclient" - "k8s.io/kubernetes/pkg/client/unversioned/testclient/simple" "k8s.io/kubernetes/pkg/runtime" ) @@ -223,47 +220,47 @@ func TestGetNewRC(t *testing.T) { tests := []struct { test string - rsList extensions.ReplicaSetList + objs []runtime.Object expected *extensions.ReplicaSet }{ { "No new ReplicaSet", - extensions.ReplicaSetList{ - Items: []extensions.ReplicaSet{ - generateRS(generateDeployment("foo")), - generateRS(generateDeployment("bar")), + []runtime.Object{ + &api.PodList{}, + &extensions.ReplicaSetList{ + Items: []extensions.ReplicaSet{ + generateRS(generateDeployment("foo")), + generateRS(generateDeployment("bar")), + }, }, }, nil, }, { "Has new ReplicaSet", - extensions.ReplicaSetList{ - Items: []extensions.ReplicaSet{ - generateRS(generateDeployment("foo")), - generateRS(generateDeployment("bar")), - generateRS(generateDeployment("abc")), - newRC, - generateRS(generateDeployment("xyz")), + []runtime.Object{ + &api.PodList{}, + &extensions.ReplicaSetList{ + Items: []extensions.ReplicaSet{ + generateRS(generateDeployment("foo")), + generateRS(generateDeployment("bar")), + generateRS(generateDeployment("abc")), + newRC, + generateRS(generateDeployment("xyz")), + }, }, }, &newRC, }, } - ns := api.NamespaceDefault for _, test := range tests { - c := &simple.Client{ - Request: simple.Request{ - Method: "GET", - Path: testapi.Default.ResourcePath("replicaSets", ns, ""), - }, - Response: simple.Response{ - StatusCode: 200, - Body: &test.rsList, - }, - } - rs, err := GetNewReplicaSet(newDeployment, c.Setup(t).Clientset) + fakeClient := &fake.Clientset{} + fakeClient = addListPodsReactor(fakeClient, test.objs[0]) + fakeClient = addListRSReactor(fakeClient, test.objs[1]) + fakeClient = addUpdatePodsReactor(fakeClient) + fakeClient = addUpdateRSReactor(fakeClient) + rs, err := GetNewReplicaSet(newDeployment, fakeClient) if err != nil { t.Errorf("In test case %s, got unexpected error %v", test.test, err) } diff --git a/test/e2e/deployment.go b/test/e2e/deployment.go index 3d280db4f6b..e07fae8fa8c 100644 --- a/test/e2e/deployment.go +++ b/test/e2e/deployment.go @@ -849,6 +849,7 @@ func testDeploymentLabelAdopted(f *Framework) { Expect(c.Extensions().ReplicaSets(ns).Delete(newRS.Name, nil)).NotTo(HaveOccurred()) }() + // The RS and pods should be relabeled before the status is updated by syncRollingUpdateDeployment err = waitForDeploymentStatus(c, ns, deploymentName, replicas, replicas-1, replicas+1, 0) Expect(err).NotTo(HaveOccurred())