mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-29 14:37:00 +00:00
Ensure Deployment labels adopted ReplicaSets and pods
This commit is contained in:
parent
3042f1d1c8
commit
da58172283
@ -665,6 +665,10 @@ func (dc *DeploymentController) getNewReplicaSet(deployment extensions.Deploymen
|
|||||||
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
newRevision := strconv.FormatInt(maxOldRevision+1, 10)
|
||||||
|
|
||||||
existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client,
|
existingNewRS, err := deploymentutil.GetNewReplicaSetFromList(deployment, dc.client,
|
||||||
|
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||||
|
podList, err := dc.podStore.Pods(namespace).List(options.LabelSelector)
|
||||||
|
return &podList, err
|
||||||
|
},
|
||||||
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
||||||
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
|
return dc.rsStore.ReplicaSets(namespace).List(options.LabelSelector)
|
||||||
})
|
})
|
||||||
|
@ -25,11 +25,13 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/apis/extensions"
|
"k8s.io/kubernetes/pkg/apis/extensions"
|
||||||
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
|
||||||
|
extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/util/integer"
|
"k8s.io/kubernetes/pkg/util/integer"
|
||||||
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
|
intstrutil "k8s.io/kubernetes/pkg/util/intstr"
|
||||||
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
labelsutil "k8s.io/kubernetes/pkg/util/labels"
|
||||||
podutil "k8s.io/kubernetes/pkg/util/pod"
|
podutil "k8s.io/kubernetes/pkg/util/pod"
|
||||||
|
"k8s.io/kubernetes/pkg/util/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -55,9 +57,12 @@ func GetOldReplicaSets(deployment extensions.Deployment, c clientset.Interface)
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type rsListFunc func(string, api.ListOptions) ([]extensions.ReplicaSet, error)
|
||||||
|
type podListFunc func(string, api.ListOptions) (*api.PodList, error)
|
||||||
|
|
||||||
// GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions.
|
// GetOldReplicaSetsFromLists returns two sets of old replica sets targeted by the given Deployment; get PodList and ReplicaSetList with input functions.
|
||||||
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
|
// Note that the first set of old replica sets doesn't include the ones with no pods, and the second set of old replica sets include all old replica sets.
|
||||||
func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList func(string, api.ListOptions) (*api.PodList, error), getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) ([]*extensions.ReplicaSet, []*extensions.ReplicaSet, error) {
|
||||||
namespace := deployment.ObjectMeta.Namespace
|
namespace := deployment.ObjectMeta.Namespace
|
||||||
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -74,7 +79,7 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In
|
|||||||
// TODO: Right now we list all replica sets and then filter. We should add an API for this.
|
// TODO: Right now we list all replica sets and then filter. We should add an API for this.
|
||||||
oldRSs := map[string]extensions.ReplicaSet{}
|
oldRSs := map[string]extensions.ReplicaSet{}
|
||||||
allOldRSs := map[string]extensions.ReplicaSet{}
|
allOldRSs := map[string]extensions.ReplicaSet{}
|
||||||
rsList, err := getRSList(namespace, options)
|
rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, fmt.Errorf("error listing replica sets: %v", err)
|
return nil, nil, fmt.Errorf("error listing replica sets: %v", err)
|
||||||
}
|
}
|
||||||
@ -113,6 +118,9 @@ func GetOldReplicaSetsFromLists(deployment extensions.Deployment, c clientset.In
|
|||||||
// Returns nil if the new replica set doesn't exist yet.
|
// Returns nil if the new replica set doesn't exist yet.
|
||||||
func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
|
func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (*extensions.ReplicaSet, error) {
|
||||||
return GetNewReplicaSetFromList(deployment, c,
|
return GetNewReplicaSetFromList(deployment, c,
|
||||||
|
func(namespace string, options api.ListOptions) (*api.PodList, error) {
|
||||||
|
return c.Core().Pods(namespace).List(options)
|
||||||
|
},
|
||||||
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
func(namespace string, options api.ListOptions) ([]extensions.ReplicaSet, error) {
|
||||||
rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
|
rsList, err := c.Extensions().ReplicaSets(namespace).List(options)
|
||||||
return rsList.Items, err
|
return rsList.Items, err
|
||||||
@ -121,14 +129,8 @@ func GetNewReplicaSet(deployment extensions.Deployment, c clientset.Interface) (
|
|||||||
|
|
||||||
// GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function.
|
// GetNewReplicaSetFromList returns a replica set that matches the intent of the given deployment; get ReplicaSetList with the input function.
|
||||||
// Returns nil if the new replica set doesn't exist yet.
|
// Returns nil if the new replica set doesn't exist yet.
|
||||||
func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getRSList func(string, api.ListOptions) ([]extensions.ReplicaSet, error)) (*extensions.ReplicaSet, error) {
|
func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Interface, getPodList podListFunc, getRSList rsListFunc) (*extensions.ReplicaSet, error) {
|
||||||
namespace := deployment.ObjectMeta.Namespace
|
rsList, err := rsListWithHashKeySynced(deployment, c, getRSList, getPodList)
|
||||||
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("invalid label selector: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
rsList, err := getRSList(namespace, api.ListOptions{LabelSelector: selector})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("error listing ReplicaSets: %v", err)
|
return nil, fmt.Errorf("error listing ReplicaSets: %v", err)
|
||||||
}
|
}
|
||||||
@ -144,6 +146,136 @@ func GetNewReplicaSetFromList(deployment extensions.Deployment, c clientset.Inte
|
|||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// rsListWithHashKeySynced returns a list of rs the deployment targets, with pod-template-hash information synced.
|
||||||
|
func rsListWithHashKeySynced(deployment extensions.Deployment, c clientset.Interface, getRSList rsListFunc, getPodList podListFunc) ([]extensions.ReplicaSet, error) {
|
||||||
|
namespace := deployment.Namespace
|
||||||
|
selector, err := unversioned.LabelSelectorAsSelector(deployment.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
options := api.ListOptions{LabelSelector: selector}
|
||||||
|
rsList, err := getRSList(namespace, options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
syncedRSList := []extensions.ReplicaSet{}
|
||||||
|
for _, rs := range rsList {
|
||||||
|
// Add pod-template-hash information if it's not in the rs
|
||||||
|
if !labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
|
||||||
|
updatedRS, err := addHashKeyToReplicaSet(deployment, c, rs, getPodList)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
syncedRSList = append(syncedRSList, *updatedRS)
|
||||||
|
}
|
||||||
|
syncedRSList = append(syncedRSList, rs)
|
||||||
|
}
|
||||||
|
return syncedRSList, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// addHashKeyToReplicaSet adds pod-template-hash information to the given rs with the following steps:
|
||||||
|
// 1. Add hash label to the rs's pod template
|
||||||
|
// 2. Add hash label to all pods this rs owns
|
||||||
|
// 3. Add hash label to the rs's selector
|
||||||
|
// 4. Clean up all pods this rs owns but without the hash label (orphaned pods)
|
||||||
|
func addHashKeyToReplicaSet(deployment extensions.Deployment, c clientset.Interface, rs extensions.ReplicaSet, getPodList podListFunc) (*extensions.ReplicaSet, error) {
|
||||||
|
if labelsutil.SelectorHasLabel(rs.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey) {
|
||||||
|
return &rs, nil
|
||||||
|
}
|
||||||
|
namespace := deployment.Namespace
|
||||||
|
hash := podutil.GetPodTemplateSpecHash(*rs.Spec.Template)
|
||||||
|
// 1. Add hash template label to the rs. This ensures that any newly created pods will have the new label.
|
||||||
|
updatedRS, err := updateRSWithRetries(c.Extensions().ReplicaSets(namespace), &rs, func(updated *extensions.ReplicaSet) {
|
||||||
|
updated.Spec.Template.Labels = labelsutil.AddLabel(updated.Spec.Template.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Update all pods managed by the rs to have the new hash label, so they are correctly adopted.
|
||||||
|
selector, err := unversioned.LabelSelectorAsSelector(updatedRS.Spec.Selector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
options := api.ListOptions{LabelSelector: selector}
|
||||||
|
podList, err := getPodList(namespace, options)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
pod.Labels = labelsutil.AddLabel(pod.Labels, extensions.DefaultDeploymentUniqueLabelKey, hash)
|
||||||
|
delay, maxRetries := 3, 3
|
||||||
|
for i := 0; i < maxRetries; i++ {
|
||||||
|
_, err = c.Core().Pods(namespace).Update(&pod)
|
||||||
|
if err != nil {
|
||||||
|
time.Sleep(time.Second * time.Duration(delay))
|
||||||
|
delay *= delay
|
||||||
|
} else {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Update rs selector
|
||||||
|
// Copy the old selector, so that we can scrub out any orphaned pods
|
||||||
|
oldSelector := updatedRS.Spec.Selector
|
||||||
|
// Update the selector of the rs so it manages all the pods we updated above
|
||||||
|
if updatedRS, err = updateRSWithRetries(c.Extensions().ReplicaSets(namespace), updatedRS, func(updated *extensions.ReplicaSet) {
|
||||||
|
updated.Spec.Selector = labelsutil.AddLabelToSelector(updated.Spec.Selector, extensions.DefaultDeploymentUniqueLabelKey, hash)
|
||||||
|
}); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4. Clean up any orphaned pods that don't have the new label, this can happen if the rs manager
|
||||||
|
// doesn't see the update to its pod template and creates a new pod with the old labels after
|
||||||
|
// we've finished re-adopting existing pods to the rs.
|
||||||
|
selector, err = unversioned.LabelSelectorAsSelector(oldSelector)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
options = api.ListOptions{LabelSelector: selector}
|
||||||
|
podList, err = getPodList(namespace, options)
|
||||||
|
hashString := fmt.Sprintf("%d", hash)
|
||||||
|
for _, pod := range podList.Items {
|
||||||
|
if value, found := pod.Labels[extensions.DefaultDeploymentUniqueLabelKey]; !found || value != hashString {
|
||||||
|
if err := c.Core().Pods(namespace).Delete(pod.Name, nil); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return updatedRS, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type updateFunc func(rs *extensions.ReplicaSet)
|
||||||
|
|
||||||
|
func updateRSWithRetries(rsClient extensions_unversioned.ReplicaSetInterface, rs *extensions.ReplicaSet, applyUpdate updateFunc) (*extensions.ReplicaSet, error) {
|
||||||
|
var err error
|
||||||
|
oldRs := rs
|
||||||
|
err = wait.Poll(10*time.Millisecond, 1*time.Minute, func() (bool, error) {
|
||||||
|
// Apply the update, then attempt to push it to the apiserver.
|
||||||
|
applyUpdate(rs)
|
||||||
|
if rs, err = rsClient.Update(rs); err == nil {
|
||||||
|
// rs contains the latest controller post update
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
// Update the controller with the latest resource version, if the update failed we
|
||||||
|
// can't trust rs so use oldRs.Name.
|
||||||
|
if rs, err = rsClient.Get(oldRs.Name); err != nil {
|
||||||
|
// The Get failed: Value in rs cannot be trusted.
|
||||||
|
rs = oldRs
|
||||||
|
}
|
||||||
|
// The Get passed: rs contains the latest controller, expect a poll for the update.
|
||||||
|
return false, nil
|
||||||
|
})
|
||||||
|
// If the error is non-nil the returned controller cannot be trusted, if it is nil, the returned
|
||||||
|
// controller contains the applied update.
|
||||||
|
return rs, err
|
||||||
|
}
|
||||||
|
|
||||||
// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
|
// Returns the desired PodTemplateSpec for the new ReplicaSet corresponding to the given ReplicaSet.
|
||||||
func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec {
|
func GetNewReplicaSetTemplate(deployment extensions.Deployment) api.PodTemplateSpec {
|
||||||
// newRS will have the same template as in deployment spec, plus a unique label in some cases.
|
// newRS will have the same template as in deployment spec, plus a unique label in some cases.
|
||||||
|
@ -54,6 +54,19 @@ func CloneAndRemoveLabel(labels map[string]string, labelKey string) map[string]s
|
|||||||
return newLabels
|
return newLabels
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddLabel returns a map with the given key and value added to the given map.
|
||||||
|
func AddLabel(labels map[string]string, labelKey string, labelValue uint32) map[string]string {
|
||||||
|
if labelKey == "" {
|
||||||
|
// Dont need to add a label.
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
if labels == nil {
|
||||||
|
labels = make(map[string]string)
|
||||||
|
}
|
||||||
|
labels[labelKey] = fmt.Sprintf("%d", labelValue)
|
||||||
|
return labels
|
||||||
|
}
|
||||||
|
|
||||||
// Clones the given selector and returns a new selector with the given key and value added.
|
// Clones the given selector and returns a new selector with the given key and value added.
|
||||||
// Returns the given selector, if labelKey is empty.
|
// Returns the given selector, if labelKey is empty.
|
||||||
func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector {
|
func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector {
|
||||||
@ -93,3 +106,30 @@ func CloneSelectorAndAddLabel(selector *unversioned.LabelSelector, labelKey stri
|
|||||||
|
|
||||||
return newSelector
|
return newSelector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AddLabelToSelector returns a selector with the given key and value added to the given selector's MatchLabels.
|
||||||
|
func AddLabelToSelector(selector *unversioned.LabelSelector, labelKey string, labelValue uint32) *unversioned.LabelSelector {
|
||||||
|
if labelKey == "" {
|
||||||
|
// Dont need to add a label.
|
||||||
|
return selector
|
||||||
|
}
|
||||||
|
if selector.MatchLabels == nil {
|
||||||
|
selector.MatchLabels = make(map[string]string)
|
||||||
|
}
|
||||||
|
selector.MatchLabels[labelKey] = fmt.Sprintf("%d", labelValue)
|
||||||
|
return selector
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectorHasLabel checks if the given selector contains the given label key in its MatchLabels or MatchExpressions
|
||||||
|
func SelectorHasLabel(selector *unversioned.LabelSelector, labelKey string) bool {
|
||||||
|
_, found := selector.MatchLabels[labelKey]
|
||||||
|
if found {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
for _, exp := range selector.MatchExpressions {
|
||||||
|
if exp.Key == labelKey && exp.Operator != unversioned.LabelSelectorOpDoesNotExist {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user