Merge pull request #47258 from janetkuo/ds-update-hash-move

Automatic merge from submit-queue

Do not add unique label to DaemonSet

**What this PR does / why we need it**:

It's mainly for #46925. DaemonSet controller adds a unique label to DaemonSet, which is unexpected to federation. 

The 1st commit addressed #46981 to construct history once and pass it around, so that we can avoid adding that unique label in DaemonSet in the 2nd commit. ~The 3rd commit just reverts the band-aid PR #47103.~


**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #46925, xref #46981

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
This commit is contained in:
Kubernetes Submit Queue 2017-06-14 15:58:16 -07:00 committed by GitHub
commit 3cb7796762
4 changed files with 50 additions and 67 deletions

View File

@ -714,18 +714,18 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll
return ds return ds
} }
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error) { func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, hash string) error {
// Find out which nodes are running the daemon pods controlled by ds. // Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
return "", fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} }
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeLister.List(labels.Everything()) nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil { if err != nil {
return "", fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
} }
var nodesNeedingDaemonPods, podsToDelete []string var nodesNeedingDaemonPods, podsToDelete []string
var failedPodsObserved int var failedPodsObserved int
@ -773,23 +773,17 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error
} }
} }
// Find current history of the DaemonSet, and label new pods using the hash label value of the current history when creating them // Label new pods using the hash label value of the current history when creating them
cur, _, err := dsc.constructHistory(ds)
if err != nil {
return "", fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return "", err return err
} }
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop // Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 { if failedPodsObserved > 0 {
return "", fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name) return fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
} }
return hash, nil return nil
} }
// syncNodes deletes given pods and creates new daemon set pods on the given nodes // syncNodes deletes given pods and creates new daemon set pods on the given nodes
@ -902,7 +896,7 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
return updateErr return updateErr
} }
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error { func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet, hash string) error {
glog.V(4).Infof("Updating daemon set status") glog.V(4).Infof("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
@ -937,7 +931,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
numberAvailable++ numberAvailable++
} }
} }
if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey]) { if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, hash) {
updatedNumberScheduled++ updatedNumberScheduled++
} }
} }
@ -990,12 +984,20 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
if err != nil { if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
} }
// Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) { if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status. // Only update status.
return dsc.updateDaemonSetStatus(ds) return dsc.updateDaemonSetStatus(ds, hash)
} }
hash, err := dsc.manage(ds) err = dsc.manage(ds, hash)
if err != nil { if err != nil {
return err return err
} }
@ -1012,12 +1014,12 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
} }
} }
err = dsc.cleanupHistory(ds) err = dsc.cleanupHistory(ds, old)
if err != nil { if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
} }
return dsc.updateDaemonSetStatus(ds) return dsc.updateDaemonSetStatus(ds, hash)
} }
// hasIntentionalPredicatesReasons checks if any of the given predicate failure reasons // hasIntentionalPredicatesReasons checks if any of the given predicate failure reasons

View File

@ -166,9 +166,27 @@ func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]
func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod { func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod {
// Add hash unique label to the pod // Add hash unique label to the pod
newLabels := label newLabels := label
var podSpec v1.PodSpec
// Copy pod spec from DaemonSet template, or use a default one if DaemonSet is nil
if ds != nil { if ds != nil {
hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)) hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
newLabels = labelsutil.CloneAndAddLabel(label, extensions.DefaultDaemonSetUniqueLabelKey, hash) newLabels = labelsutil.CloneAndAddLabel(label, extensions.DefaultDaemonSetUniqueLabelKey, hash)
podSpec = ds.Spec.Template.Spec
} else {
podSpec = v1.PodSpec{
Containers: []v1.Container{
{
Image: "foo/bar",
TerminationMessagePath: v1.TerminationMessagePathDefault,
ImagePullPolicy: v1.PullIfNotPresent,
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
},
},
}
}
// Add node name to the pod
if len(nodeName) > 0 {
podSpec.NodeName = nodeName
} }
pod := &v1.Pod{ pod := &v1.Pod{
@ -178,18 +196,7 @@ func newPod(podName string, nodeName string, label map[string]string, ds *extens
Labels: newLabels, Labels: newLabels,
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
}, },
Spec: v1.PodSpec{ Spec: podSpec,
NodeName: nodeName,
Containers: []v1.Container{
{
Image: "foo/bar",
TerminationMessagePath: v1.TerminationMessagePathDefault,
ImagePullPolicy: v1.PullIfNotPresent,
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
},
},
DNSPolicy: v1.DNSDefault,
},
} }
pod.Name = names.SimpleNameGenerator.GenerateName(podName) pod.Name = names.SimpleNameGenerator.GenerateName(podName)
if ds != nil { if ds != nil {
@ -718,14 +725,8 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec ds.Spec.Template.Spec = podSpec
manager.dsStore.Add(ds) manager.dsStore.Add(ds)
manager.podStore.Add(&v1.Pod{ pod := newPod(ds.Name+"-", node.Name, simpleDaemonSetLabel, ds)
ObjectMeta: metav1.ObjectMeta{ manager.podStore.Add(pod)
Labels: simpleDaemonSetLabel,
Namespace: metav1.NamespaceDefault,
OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)},
},
Spec: podSpec,
})
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
} }
} }

View File

@ -79,7 +79,9 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, hash st
return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
} }
// constructHistory returns current history and a list of old histories of a given DaemonSet. // constructHistory finds all histories controlled by the given DaemonSet, and
// update current history revision number, or create current history if need to.
// It also deduplicates current history, and adds missing unique labels to existing histories.
func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) {
var histories []*apps.ControllerRevision var histories []*apps.ControllerRevision
var currentHistories []*apps.ControllerRevision var currentHistories []*apps.ControllerRevision
@ -144,32 +146,14 @@ func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur
} }
} }
} }
// Label ds with current history's unique label as well
if ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey] != cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey] {
var clone interface{}
clone, err = api.Scheme.DeepCopy(ds)
if err != nil {
return nil, nil, err
}
toUpdate := clone.(*extensions.DaemonSet)
if toUpdate.Labels == nil {
toUpdate.Labels = make(map[string]string)
}
toUpdate.Labels[extensions.DefaultDaemonSetUniqueLabelKey] = cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
_, err = dsc.kubeClient.ExtensionsV1beta1().DaemonSets(ds.Namespace).UpdateStatus(toUpdate)
}
return cur, old, err return cur, old, err
} }
func (dsc *DaemonSetsController) cleanupHistory(ds *extensions.DaemonSet) error { func (dsc *DaemonSetsController) cleanupHistory(ds *extensions.DaemonSet, old []*apps.ControllerRevision) error {
nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} }
_, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
toKeep := int(*ds.Spec.RevisionHistoryLimit) toKeep := int(*ds.Spec.RevisionHistoryLimit)
toKill := len(old) - toKeep toKill := len(old) - toKeep

View File

@ -270,8 +270,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{}) ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
first := curHistory(listDaemonHistories(c, ns, label), ds) first := curHistory(listDaemonHistories(c, ns, label), ds)
firstHash := ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey] firstHash := first.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
Expect(first.Labels[extensions.DefaultDaemonSetUniqueLabelKey]).To(Equal(firstHash))
Expect(first.Revision).To(Equal(int64(1))) Expect(first.Revision).To(Equal(int64(1)))
checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash, templateGeneration) checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash, templateGeneration)
@ -297,9 +296,8 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{}) ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cur := curHistory(listDaemonHistories(c, ns, label), ds) cur := curHistory(listDaemonHistories(c, ns, label), ds)
curHash := ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
Expect(cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]).To(Equal(curHash))
Expect(cur.Revision).To(Equal(int64(2))) Expect(cur.Revision).To(Equal(int64(2)))
Expect(cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]).NotTo(Equal(firstHash))
checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash, templateGeneration) checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), firstHash, templateGeneration)
}) })
@ -327,8 +325,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{}) ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cur := curHistory(listDaemonHistories(c, ns, label), ds) cur := curHistory(listDaemonHistories(c, ns, label), ds)
hash := ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey] hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
Expect(cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]).To(Equal(hash))
Expect(cur.Revision).To(Equal(int64(1))) Expect(cur.Revision).To(Equal(int64(1)))
checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash, fmt.Sprint(templateGeneration)) checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash, fmt.Sprint(templateGeneration))
@ -355,8 +352,7 @@ var _ = framework.KubeDescribe("Daemon set [Serial]", func() {
ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{}) ds, err = c.Extensions().DaemonSets(ns).Get(ds.Name, metav1.GetOptions{})
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
cur = curHistory(listDaemonHistories(c, ns, label), ds) cur = curHistory(listDaemonHistories(c, ns, label), ds)
hash = ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey] hash = cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
Expect(cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]).To(Equal(hash))
Expect(cur.Revision).To(Equal(int64(2))) Expect(cur.Revision).To(Equal(int64(2)))
checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash, fmt.Sprint(templateGeneration)) checkDaemonSetPodsLabels(listDaemonPods(c, ns, label), hash, fmt.Sprint(templateGeneration))
}) })