Ignore not found error when retrying rs and pods updates

This commit is contained in:
Janet Kuo
2016-03-02 12:52:12 -08:00
parent b05cf6d53a
commit 75e570832b
3 changed files with 158 additions and 70 deletions

View File

@@ -27,13 +27,12 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/integer"
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
podutil "k8s.io/kubernetes/pkg/util/pod"
rsutil "k8s.io/kubernetes/pkg/util/replicaset"
"k8s.io/kubernetes/pkg/util/wait"
)
@@ -186,21 +185,28 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf
ObjectMeta: meta,
Spec: rs.Spec.Template.Spec,
}))
rsUpdated := false
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
if len(updatedRS.Spec.Template.Labels[extensions.DefaultDeploymentUniqueLabelKey]) == 0 {
updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
})
if err != nil {
return nil, fmt.Errorf("error updating rs %s pod template label with template hash: %v", updatedRS.Name, err)
return nil, fmt.Errorf("error updating %s %s/%s pod template label with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for rs %s generation %d observed by controller: %v", updatedRS.Name, updatedRS.Generation, err)
if rsUpdated {
// Make sure rs pod template is updated so that it won't create pods without the new label (orphaned pods).
if updatedRS.Generation > updatedRS.Status.ObservedGeneration {
if err = waitForReplicaSetUpdated(c, updatedRS.Generation, namespace, updatedRS.Name); err != nil {
return nil, fmt.Errorf("error waiting for %s %s/%s generation %d observed by controller: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, updatedRS.Generation, err)
}
}
glog.V(4).Infof("Observed the update of %s %s/%s's pod template with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If RS wasn't updated but didn't return error in step 1, we've hit a RS not found error.
// Return here and retry in the next sync loop.
return &rs, nil
}
glog.V(4).Infof("Observed the update of rs %s's pod template with hash %s.", rs.Name, hash)
}
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
@@ -213,20 +219,28 @@ func addHashKeyToRSAndPods(deployment *extensions.Deployment, c clientset.Interf
if err != nil {
return nil, fmt.Errorf("error in getting pod list for namespace %s and list options %+v: %s", namespace, options, err)
}
if err = labelPodsWithHash(podList, c, namespace, hash); err != nil {
allPodsLabeled := false
if allPodsLabeled, err = labelPodsWithHash(podList, updatedRS, c, namespace, hash); err != nil {
return nil, fmt.Errorf("error in adding template hash label %s to pods %+v: %s", hash, podList, err)
}
glog.V(4).Infof("Labeled rs %s's pods with hash %s.", rs.Name, hash)
// If not all pods are labeled but didn't return error in step 2, we've hit at least one pod not found error.
// Return here and retry in the next sync loop.
if !allPodsLabeled {
return updatedRS, nil
}
// 3. Update rs label and selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
if updatedRS, rsUpdated, err = rsutil.UpdateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
return nil, fmt.Errorf("error updating rs %s label and selector with template hash: %v", updatedRS.Name, err)
return nil, fmt.Errorf("error updating %s %s/%s label and selector with template hash: %v", updatedRS.Kind, updatedRS.Namespace, updatedRS.Name, err)
}
glog.V(4).Infof("Updated rs %s's selector and label with hash %s.", rs.Name, hash)
if rsUpdated {
glog.V(4).Infof("Updated %s %s/%s's selector and label with hash %s.", rs.Kind, rs.Namespace, rs.Name, hash)
}
// If the RS isn't actually updated in step 3, that's okay, we'll retry in the next sync loop since its selector isn't updated yet.
// TODO: look for orphaned pods and label them in the background somewhere else periodically
@@ -244,70 +258,26 @@ func waitForReplicaSetUpdated(c clientset.Interface, desiredGeneration int64, na
}
// labelPodsWithHash labels all pods in the given podList with the new hash label.
func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error {
// The returned bool value can be used to tell if all pods are actually labeled.
func labelPodsWithHash(podList *api.PodList, rs *extensions.ReplicaSet, c clientset.Interface, namespace, hash string) (bool, error) {
allPodsLabeled := true
for _, pod := range podList.Items {
// Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
if _, err := updatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) {
if _, podUpdated, err := podutil.UpdatePodWithRetries(c.Core().Pods(namespace), &pod, func(podToUpdate *api.Pod) {
podToUpdate.Labels = labelsutil.AddLabel(podToUpdate.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
return fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
return false, fmt.Errorf("error in adding template hash label %s to pod %+v: %s", hash, pod, err)
} else if podUpdated {
glog.V(4).Infof("Labeled %s %s/%s of %s %s/%s with hash %s.", pod.Kind, pod.Namespace, pod.Name, rs.Kind, rs.Namespace, rs.Name, hash)
} else {
// If the pod wasn't updated but didn't return error when we try to update it, we've hit a pod not found error.
// Then we can't say all pods are labeled
allPodsLabeled = false
}
glog.V(4).Infof("Labeled pod %s with hash %s.", pod.Name, hash)
}
}
return nil
}
// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet)
func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) {
var err error
oldRs := rs
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
rs, err = rsClient.Get(oldRs.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(rs)
if rs, err = rsClient.Update(rs); err == nil {
// Update successful.
return true, nil
}
// Update could have failed due to conflict error. Try again.
return false, nil
})
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
// controller contains the applied update.
return rs, err
}
type updatePodFunc func(pod *api.Pod)
func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
var err error
oldPod := pod
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
pod, err = podClient.Get(oldPod.Name)
if err != nil {
return false, err
}
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(pod)
if pod, err = podClient.Update(pod); err == nil {
// Update successful.
return true, nil
}
// Update could have failed due to conflict error. Try again.
return false, nil
})
if err == wait.ErrWaitTimeout {
return nil, fmt.Errorf("timed out trying to update pod: %+v", oldPod)
}
return pod, err
return allPodsLabeled, nil
}
// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.