Address comments

This commit is contained in:
Janet Kuo 2016-02-18 11:45:24 -08:00
parent 14bab2bb3a
commit 0e5da8460d

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/apis/extensions"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedcore "k8s.io/kubernetes/pkg/client/typed/generated/core/unversioned"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/integer"
@ -163,16 +164,25 @@ func rsAndPodsWithHashKeySynced(deployment extensions.Deployment, c clientset.In
}
// addHashKeyToRSAndPods adds pod-template-hash information to the given rs, if it's not already there, with the following steps:
// 1. Add hash label to all pods this rs owns
// 2. Add hash label to the rs's pod template, the rs's label, and the rs's selector
// 3. Clean up all pods this rs owns but without the hash label (orphaned pods)
// 1. Add hash label to the rs's pod template
// 2. Add hash label to all pods this rs owns
// 3. Add hash label to the rs's label and selector
// 4. Add hash label to all pods this rs owns but without the hash label (orphaned pods)
func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) {
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
return &rs, nil
}
namespace := deployment.Namespace
hash := fmt.Sprintf("%d", podutil.GetPodTemplateSpecHash(*rs.Spec.Template))
// 1. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
})
if err != nil {
return nil, err
}
// 2. Update all pods managed by the rs to have the new hash label, so they will be correctly adopted.
selector, err := unversioned.LabelSelectorAsSelector(rs.Spec.Selector)
if err != nil {
return nil, err
@ -182,46 +192,22 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa
if err != nil {
return nil, err
}
for _, pod := range podList.Items {
// If the pod already has the new hash label, avoid re-labeling it
if len(pod.Labels) > 0 && len(pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]) > 0 {
continue
}
pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
delay, maxRetries := 3, 3
podName := pod.Name
for i := 0; i < maxRetries; i++ {
_, err = c.Core().Pods(namespace).Update(&pod)
if err == nil {
break
}
time.Sleep(time.Second * time.Duration(delay))
delay *= delay
getPod, err := c.Core().Pods(namespace).Get(podName)
if err != nil {
return nil, err
}
pod = *getPod
}
if err != nil {
return nil, err
}
}
// 2. Update rs label, rs template label, and rs selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
oldSelector := rs.Spec.Selector
// Update the selector of the rs so it manages all the pods we updated above
updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
})
if err != nil {
if err = labelPodsWithHash(podList, c, namespace, hash); err != nil {
return nil, err
}
// 3. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager
// 3. Update rs label, rs template label, and rs selector to include the new hash label
// Copy the old selector, so that we can scrub out any orphaned pods
oldSelector := rs.Spec.Selector
// Update the selector of the rs so it manages all the pods we updated above
if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
updated.Labels = labelsutil.AddLabel(updated.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
return nil, err
}
// 4. Label any orphaned pods that don't have the new label, this can happen if the rs manager
// doesn't see the update to its pod template and creates a new pod with the old labels after
// we've finished re-adopting existing pods to the rs.
selector, err = unversioned.LabelSelectorAsSelector(oldSelector)
@ -230,20 +216,36 @@ func addHashKeyToRSAndPods(deployment extensions.Deployment, c clientset.Interfa
}
options = api.ListOptions{LabelSelector: selector}
podList, err = getPodList(namespace, options)
for _, pod := range podList.Items {
if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hash {
if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil {
return nil, err
}
}
if err != nil {
return nil, err
}
if err = labelPodsWithHash(podList, c, namespace, hash); err != nil {
return nil, err
}
return updatedRS, nil
}
type updateFunc func(rs *extensions.ReplicaSet)
// labelPodsWithHash labels all pods in the given podList with the new hash label.
func labelPodsWithHash(podList *api.PodList, c clientset.Interface, namespace, hash string) error {
for _, pod := range podList.Items {
// Only label the pod that doesn't already have the new hash
if pod.Labels[extensions.DefaultDeploymentUniqueLabelKey] != hash {
if _, err := updatePodWithRetries(c.Core().Pods(namespace), &pod, func(updated *api.Pod) {
pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
}); err != nil {
return err
}
}
}
return nil
}
func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) {
// TODO: use client library instead when it starts to support update retries
// see https://github.com/kubernetes/kubernetes/issues/21479
type updateRSFunc func(rs *extensions.ReplicaSet)
func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateRSFunc) (*extensions.ReplicaSet, error) {
var err error
oldRs := rs
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
@ -267,6 +269,25 @@ func updateRSWithRetries(rsClient unversionedextensions.ReplicaSetInterface, rs
return rs, err
}
type updatePodFunc func(pod *api.Pod)
func updatePodWithRetries(podClient unversionedcore.PodInterface, pod *api.Pod, applyUpdate updatePodFunc) (*api.Pod, error) {
var err error
oldPod := pod
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
// Apply the update, then attempt to push it to the apiserver.
applyUpdate(pod)
if pod, err = podClient.Update(pod); err == nil {
return true, nil
}
if pod, err = podClient.Get(oldPod.Name); err != nil {
pod = oldPod
}
return false, nil
})
return pod, err
}
// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec {
// newRS will have the same template as in deployment spec, plus a unique label in some cases.