diff --git a/pkg/util/deployment/deployment.go b/pkg/util/deployment/deployment.go index 244049b027c..6ad5e6bf6d9 100644 --- a/pkg/util/deployment/deployment.go +++ b/pkg/util/deployment/deployment.go @@ -29,6 +29,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/integer" intstrutil "k8s.io/kubernetes/pkg/util/intstr" labelsutil "k8s.io/kubernetes/pkg/util/labels" @@ -188,27 +189,32 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf })) rsUpdated := false // 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label. - if len(updatedRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 { - updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { - updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - }) - if err != nil { - return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) - } - if rsUpdated { - // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). - if updatedRS.Generation > updatedRS.Status.ObservedGeneration { - if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { - return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) - } + updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its pod template label. + if updated.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated } - glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) - } else { - // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. - // Return here and retry in the next sync loop. - return &rs, nil - } + updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }) + if err != nil { + return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } + if rsUpdated { + // Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods). + if updatedRS.Generation > updatedRS.Status.ObservedGeneration { + if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil { + return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err) + } + } + glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash) + } else { + // If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error. + // Return here and retry in the next sync loop. + return &rs, nil + } + glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash) // 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted. selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector) @@ -232,10 +238,16 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf // 3. Update rs label and selector to include the new hash label // Copy the old selector, so that we can scrub out any orphaned pods - if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) { - updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) - }); err != nil { + if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, + func(updated *extensions.ReplicaSet) error { + // Precondition: the RS doesn't contain the new hash in its label or selector. + if updated.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash && updated.Spec.Selector.MatchLabels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } + updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }); err != nil { return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err) } if rsUpdated { @@ -265,14 +277,20 @@ func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c client for _, pod := range podList.Items { // Only label the pod that doesn't already have the new hash if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash { - if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) { - podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) - }); err != nil { + if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, + func(podToUpdate *api.Pod) error { + // Precondition: the pod doesn't contain the new hash in its label. + if podToUpdate.Labels[extensions.DefaultDeploymentUniqueLabelKey] == hash { + return errors.ErrPreconditionViolated + } + podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash) + return nil + }); err != nil { return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err) } else if podUpdated { glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash) } else { - // If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error. + // If the pod wasn't updated but didn't return error when we try to update it, we've hit "pod not found" or "precondition violated" error. // Then we can't say all pods are labeled allPodsLabeled = false } diff --git a/pkg/util/errors/errors.go b/pkg/util/errors/errors.go index a1a8e7aa24c..df3adaf3e8b 100644 --- a/pkg/util/errors/errors.go +++ b/pkg/util/errors/errors.go @@ -16,7 +16,10 @@ limitations under the License. package errors -import "fmt" +import ( + "errors" + "fmt" +) // Aggregate represents an object that contains multiple errors, but does not // necessarily have singular semantic meaning. @@ -148,3 +151,6 @@ func AggregateGoroutines(funcs ...func() error) Aggregate { } return NewAggregate(errs) } + +// ErrPreconditionViolated is returned when the precondition is violated +var ErrPreconditionViolated = errors.New("precondition is violated") diff --git a/pkg/util/pod/pod.go b/pkg/util/pod/pod.go index 8f755473bb0..8fb5cadd0b1 100644 --- a/pkg/util/pod/pod.go +++ b/pkg/util/pod/pod.go @@ -26,6 +26,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned" + errorsutil "k8s.io/kubernetes/pkg/util/errors" hashutil "k8s.io/kubernetes/pkg/util/hash" "k8s.io/kubernetes/pkg/util/wait" ) @@ -38,7 +39,7 @@ func GetPodTemplateSpecHash(template api.PodTemplateSpec) uint32 { // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 -type updatePodFunc func(pod *api.Pod) +type updatePodFunc func(pod *api.Pod) error // UpdatePodWithRetries updates a pod with given applyUpdate function. Note that pod not found error is ignored. // The returned bool value can be used to tell if the pod is actually updated. @@ -52,8 +53,9 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, return false, err } // Apply the update, then attempt to push it to the apiserver. - // TODO: add precondition for update - applyUpdate(pod) + if err = applyUpdate(pod); err != nil { + return false, err + } if pod, err = podClient.Update(pod); err == nil { // Update successful. return true, nil @@ -66,12 +68,22 @@ func UpdatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, podUpdated = true } + // Handle returned error from wait poll if err == wait.ErrWaitTimeout { err = fmt.Errorf("timed out trying to update pod: %+v", oldPod) } + // Ignore the pod not found error, but the pod isn't updated. if errors.IsNotFound(err) { glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) err = nil } + // Ignore the precondition violated error, but the pod isn't updated. + if err == errorsutil.ErrPreconditionViolated { + glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldPod.Kind, oldPod.Namespace, oldPod.Name) + err = nil + } + + // If the error is non-nil the returned pod cannot be trusted; if podUpdated is false, the pod isn't updated; + // if the error is nil and podUpdated is true, the returned pod contains the applied update. return pod, podUpdated, err } diff --git a/pkg/util/replicaset/replicaset.go b/pkg/util/replicaset/replicaset.go index 3859f20186e..218298a5158 100644 --- a/pkg/util/replicaset/replicaset.go +++ b/pkg/util/replicaset/replicaset.go @@ -24,12 +24,13 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/apis/extensions" unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + errorsutil "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/wait" ) // TODO: use client library instead when it starts to support update retries // see https://github.com/kubernetes/kubernetes/issues/21479 -type updateRSFunc func(rs *extensions.ReplicaSet) +type updateRSFunc func(rs *extensions.ReplicaSet) error // UpdateRSWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored. // The returned bool value can be used to tell if the RS is actually updated. @@ -43,8 +44,9 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs return false, err } // Apply the update, then attempt to push it to the apiserver. - // TODO: add precondition for update - applyUpdate(rs) + if err = applyUpdate(rs); err != nil { + return false, err + } if rs, err = rsClient.Update(rs); err == nil { // Update successful. return true, nil @@ -57,6 +59,7 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs rsUpdated = true } + // Handle returned error from wait poll if err == wait.ErrWaitTimeout { err = fmt.Errorf("timed out trying to update RS: %+v", oldRs) } @@ -65,7 +68,13 @@ func UpdateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs glog.V(4).Infof("%s %s/%s is not found, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) err = nil } - // If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned - // controller contains the applied update. + // Ignore the precondition violated error, but the RS isn't updated. + if err == errorsutil.ErrPreconditionViolated { + glog.V(4).Infof("%s %s/%s precondition doesn't hold, skip updating it.", oldRs.Kind, oldRs.Namespace, oldRs.Name) + err = nil + } + + // If the error is non-nil the returned RS cannot be trusted; if rsUpdated is false, the contoller isn't updated; + // if the error is nil and rsUpdated is true, the returned RS contains the applied update. return rs, rsUpdated, err }