From aae2073a787ae9f4e631f2593abc13daa08327f6 Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Wed, 2 Mar 2016 15:06:00 -0800 Subject: [PATCH 1/2] Add preconditions to pod/rs update retry func --- pkg/util/deployment/deployment.go | 61 ++++++++++++++++++------------- pkg/util/pod/pod.go | 10 +++-- pkg/util/replicaset/replicaset.go | 10 +++-- 3 files changed, 50 insertions(+), 31 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 957029c5748..5728e1d87f6 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -187,27 +187,30 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf })) rsUpdated := false // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. - if len(updatedRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { - updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { + updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) bool { + return updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash + }, + func(updated *extensions.ReplicaSet) { updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) }) - if err != nil { - return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - if rsUpdated { - // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). - if updatedRS.Generation > updatedRS.Status.ObservedGeneration { - if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) - } - } - glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) - } else { - // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. - // Return here and retry in the next sync loop. - return &rs, nil - } + if err != nil { + return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } + if rsUpdated { + // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). + if updatedRS.Generation > updatedRS.Status.ObservedGeneration { + if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) + } + } + glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + } else { + // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. + // Return here and retry in the next sync loop. + return &rs, nil + } + glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash) // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) @@ -231,10 +234,14 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf // 3. Update rs label and selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods - if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { - updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) - }); err != nil { + if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) bool { + return updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash || updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] != hash + }, + func(updated *extensions.ReplicaSet) { + updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + }); err != nil { return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } if rsUpdated { @@ -264,9 +271,13 @@ func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c client for _, pod := range podList.Items { // Only label the pod that doesn't already have the new hash if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { - if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) { - podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - }); err != nil { + if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, + func(podToUpdate *api.Pod) bool { + return podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash + }, + func(podToUpdate *api.Pod) { + podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + }); err != nil { return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) } else if podUpdated { glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash) diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index 8f755473bb0..f8f63675134 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -39,10 +39,11 @@ func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 { // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 type updatePodFunc func(pod *api.Pod) +type preconditionFunc func(pod *api.Pod) bool -// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored. +// UpdatePodWithRetries updates a pod with given applyUpdate function, when the given precondition holds. Note that pod not found error is ignored. // The returned bool value can be used to tell if the pod is actually updated. -func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, bool, error) { +func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, preconditionHold preconditionFunc, applyUpdate updatePodFunc) (*api.Pod, bool, error) { var err error var podUpdated bool oldPod := pod @@ -51,8 +52,11 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, if err != nil { return false, err } + if !preconditionHold(pod) { + glog.V(4).Infof("pod %s precondition doesn't hold, skip updating it.", pod.Name) + return true, nil + } // Apply the update, then attempt to push it to the apiserver. - // TODO: add precondition for update applyUpdate(pod) if pod, err = podClient.Update(pod); err == nil { // Update successful. diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 3859f20186e..9f182198053 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -30,10 +30,11 @@ import ( // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 type updateRSFunc func(rs *extensions.ReplicaSet) +type preconditionFunc func(rs *extensions.ReplicaSet) bool -// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored. +// UpdateRSWithRetries updates a RS with given applyUpdate function, when the given precondition holds. Note that RS not found error is ignored. // The returned bool value can be used to tell if the RS is actually updated. -func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { +func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, preconditionHold preconditionFunc, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { var err error var rsUpdated bool oldRs := rs @@ -42,8 +43,11 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs if err != nil { return false, err } + if !preconditionHold(rs) { + glog.V(4).Infof("rs %s precondition doesn't hold, skip updating it.", rs.Name) + return true, nil + } // Apply the update, then attempt to push it to the apiserver. - // TODO: add precondition for update applyUpdate(rs) if rs, err = rsClient.Update(rs); err == nil { // Update successful. From 64731e8083f86b16f51f57f19a692b61167796ec Mon Sep 17 00:00:00 2001 From: Janet Kuo Date: Thu, 3 Mar 2016 15:48:56 -0800 Subject: [PATCH 2/2] Address comments --- pkg/util/deployment/deployment.go | 33 +++++++++++++++++++------------ pkg/util/errors/errors.go | 8 +++++++- pkg/util/pod/pod.go | 26 +++++++++++++++--------- pkg/util/replicaset/replicaset.go | 27 ++++++++++++++----------- 4 files changed, 60 insertions(+), 34 deletions(-) diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 5728e1d87f6..dde51134003 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/apis/extensions" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" @@ -188,11 +189,13 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf rsUpdated := false // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) bool { - return updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash - }, - func(updated *extensions.ReplicaSet) { + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its pod template label. + if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil }) if err != nil { return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) @@ -235,12 +238,14 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf // 3. Update rs label and selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, - func(updated *extensions.ReplicaSet) bool { - return updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash || updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] != hash - }, - func(updated *extensions.ReplicaSet) { + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its label or selector. + if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil }); err != nil { return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } @@ -272,17 +277,19 @@ func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c client // Only label the pod that doesn't already have the new hash if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, - func(podToUpdate *api.Pod) bool { - return podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash - }, - func(podToUpdate *api.Pod) { + func(podToUpdate *api.Pod) error { + // Precondition: the pod doesn't contain the new hash in its label. + if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil }); err != nil { return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) } else if podUpdated { glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash) } else { - // If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error. + // If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error. // Then we can't say all pods are labeled allPodsLabeled = false } diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go index a1a8e7aa24c..df3adaf3e8b 100644 --- a/pkg/util/errors/errors.go +++ b/pkg/util/errors/errors.go @@ -16,7 +16,10 @@ limitations under the License. package errors -import "fmt" +import ( + "errors" + "fmt" +) // Aggregate represents an object that contains multiple errors, but does not // necessarily have singular semantic meaning. @@ -148,3 +151,6 @@ func AggregateGoroutines(funcs ...func() error) Aggregate { } return NewAggregate(errs) } + +// ErrPreconditionViolated is returned when the precondition is violated +var ErrPreconditionViolated = errors.New("precondition is violated") diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index f8f63675134..8fb5cadd0b1 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" + errorsutil "k8s.io/kubernetes/pkg/util/errors" hashutil "k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/pkg/util/wait" ) @@ -38,12 +39,11 @@ func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 { // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 -type updatePodFunc func(pod *api.Pod) -type preconditionFunc func(pod *api.Pod) bool +type updatePodFunc func(pod *api.Pod) error -// UpdatePodWithRetries updates a pod with given applyUpdate function, when the given precondition holds. Note that pod not found error is ignored. +// UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored. // The returned bool value can be used to tell if the pod is actually updated. -func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, preconditionHold preconditionFunc, applyUpdate updatePodFunc) (*api.Pod, bool, error) { +func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, bool, error) { var err error var podUpdated bool oldPod := pod @@ -52,12 +52,10 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, if err != nil { return false, err } - if !preconditionHold(pod) { - glog.V(4).Infof("pod %s precondition doesn't hold, skip updating it.", pod.Name) - return true, nil - } // Apply the update, then attempt to push it to the apiserver. - applyUpdate(pod) + if err = applyUpdate(pod); err != nil { + return false, err + } if pod, err = podClient.Update(pod); err == nil { // Update successful. return true, nil @@ -70,12 +68,22 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, podUpdated = true } + // Handle returned error from wait poll if err == wait.ErrWaitTimeout { err = fmt.Errorf("timed out trying to update pod: %+v", oldPod) } + // Ignore the pod not found error, but the pod isn't updated. if errors.IsNotFound(err) { glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) err = nil } + // Ignore the precondition violated error, but the pod isn't updated. + if err == errorsutil.ErrPreconditionViolated { + glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) + err = nil + } + + // If the error is non-nil the returned pod cannot be trusted; if podUpdated is false, the pod isn't updated; + // if the error is nil and podUpdated is true, the returned pod contains the applied update. return pod, podUpdated, err } diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 9f182198053..218298a5158 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -24,17 +24,17 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/extensions" unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + errorsutil "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/wait" ) // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 -type updateRSFunc func(rs *extensions.ReplicaSet) -type preconditionFunc func(rs *extensions.ReplicaSet) bool +type updateRSFunc func(rs *extensions.ReplicaSet) error -// UpdateRSWithRetries updates a RS with given applyUpdate function, when the given precondition holds. Note that RS not found error is ignored. +// UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored. // The returned bool value can be used to tell if the RS is actually updated. -func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, preconditionHold preconditionFunc, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { +func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, bool, error) { var err error var rsUpdated bool oldRs := rs @@ -43,12 +43,10 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs if err != nil { return false, err } - if !preconditionHold(rs) { - glog.V(4).Infof("rs %s precondition doesn't hold, skip updating it.", rs.Name) - return true, nil - } // Apply the update, then attempt to push it to the apiserver. - applyUpdate(rs) + if err = applyUpdate(rs); err != nil { + return false, err + } if rs, err = rsClient.Update(rs); err == nil { // Update successful. return true, nil @@ -61,6 +59,7 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs rsUpdated = true } + // Handle returned error from wait poll if err == wait.ErrWaitTimeout { err = fmt.Errorf("timed out trying to update RS: %+v", oldRs) } @@ -69,7 +68,13 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) err = nil } - // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned - // controller contains the applied update. + // Ignore the precondition violated error, but the RS isn't updated. + if err == errorsutil.ErrPreconditionViolated { + glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) + err = nil + } + + // If the error is non-nil the returned RS cannot be trusted; if rsUpdated is false, the contoller isn't updated; + // if the error is nil and rsUpdated is true, the returned RS contains the applied update. return rs, rsUpdated, err }