Construct history once and pass around in DaemonSet sync loop

This commit is contained in:
Janet Kuo 2017-06-09 11:03:38 -07:00
parent 0a1b7d94b4
commit f43060ea41
3 changed files with 46 additions and 45 deletions

View File

@ -714,18 +714,18 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll
return ds return ds
} }
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error) { func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, hash string) error {
// Find out which nodes are running the daemon pods controlled by ds. // Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
return "", fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} }
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeLister.List(labels.Everything()) nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil { if err != nil {
return "", fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
} }
var nodesNeedingDaemonPods, podsToDelete []string var nodesNeedingDaemonPods, podsToDelete []string
var failedPodsObserved int var failedPodsObserved int
@ -773,23 +773,17 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) (string, error
} }
} }
// Find current history of the DaemonSet, and label new pods using the hash label value of the current history when creating them // Label new pods using the hash label value of the current history when creating them
cur, _, err := dsc.constructHistory(ds)
if err != nil {
return "", fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil { if err = dsc.syncNodes(ds, podsToDelete, nodesNeedingDaemonPods, hash); err != nil {
return "", err return err
} }
// Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop // Throw an error when the daemon pods fail, to use ratelimiter to prevent kill-recreate hot loop
if failedPodsObserved > 0 { if failedPodsObserved > 0 {
return "", fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name) return fmt.Errorf("deleted %d failed pods of DaemonSet %s/%s", failedPodsObserved, ds.Namespace, ds.Name)
} }
return hash, nil return nil
} }
// syncNodes deletes given pods and creates new daemon set pods on the given nodes // syncNodes deletes given pods and creates new daemon set pods on the given nodes
@ -902,7 +896,7 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
return updateErr return updateErr
} }
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error { func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet, hash string) error {
glog.V(4).Infof("Updating daemon set status") glog.V(4).Infof("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
@ -937,7 +931,7 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
numberAvailable++ numberAvailable++
} }
} }
if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, ds.Labels[extensions.DefaultDaemonSetUniqueLabelKey]) { if util.IsPodUpdated(ds.Spec.TemplateGeneration, pod, hash) {
updatedNumberScheduled++ updatedNumberScheduled++
} }
} }
@ -990,12 +984,20 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
if err != nil { if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
} }
// Construct histories of the DaemonSet, and get the hash of current history
cur, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
hash := cur.Labels[extensions.DefaultDaemonSetUniqueLabelKey]
if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) { if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) {
// Only update status. // Only update status.
return dsc.updateDaemonSetStatus(ds) return dsc.updateDaemonSetStatus(ds, hash)
} }
hash, err := dsc.manage(ds) err = dsc.manage(ds, hash)
if err != nil { if err != nil {
return err return err
} }
@ -1012,12 +1014,12 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
} }
} }
err = dsc.cleanupHistory(ds) err = dsc.cleanupHistory(ds, old)
if err != nil { if err != nil {
return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err) return fmt.Errorf("failed to clean up revisions of DaemonSet: %v", err)
} }
return dsc.updateDaemonSetStatus(ds) return dsc.updateDaemonSetStatus(ds, hash)
} }
// hasIntentionalPredicatesReasons checks if any of the given predicate failure reasons // hasIntentionalPredicatesReasons checks if any of the given predicate failure reasons

View File

@ -166,9 +166,27 @@ func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]
func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod { func newPod(podName string, nodeName string, label map[string]string, ds *extensions.DaemonSet) *v1.Pod {
// Add hash unique label to the pod // Add hash unique label to the pod
newLabels := label newLabels := label
var podSpec v1.PodSpec
// Copy pod spec from DaemonSet template, or use a default one if DaemonSet is nil
if ds != nil { if ds != nil {
hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount)) hash := fmt.Sprint(controller.ComputeHash(&ds.Spec.Template, ds.Status.CollisionCount))
newLabels = labelsutil.CloneAndAddLabel(label, extensions.DefaultDaemonSetUniqueLabelKey, hash) newLabels = labelsutil.CloneAndAddLabel(label, extensions.DefaultDaemonSetUniqueLabelKey, hash)
podSpec = ds.Spec.Template.Spec
} else {
podSpec = v1.PodSpec{
Containers: []v1.Container{
{
Image: "foo/bar",
TerminationMessagePath: v1.TerminationMessagePathDefault,
ImagePullPolicy: v1.PullIfNotPresent,
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
},
},
}
}
// Add node name to the pod
if len(nodeName) > 0 {
podSpec.NodeName = nodeName
} }
pod := &v1.Pod{ pod := &v1.Pod{
@ -178,18 +196,7 @@ func newPod(podName string, nodeName string, label map[string]string, ds *extens
Labels: newLabels, Labels: newLabels,
Namespace: metav1.NamespaceDefault, Namespace: metav1.NamespaceDefault,
}, },
Spec: v1.PodSpec{ Spec: podSpec,
NodeName: nodeName,
Containers: []v1.Container{
{
Image: "foo/bar",
TerminationMessagePath: v1.TerminationMessagePathDefault,
ImagePullPolicy: v1.PullIfNotPresent,
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
},
},
DNSPolicy: v1.DNSDefault,
},
} }
pod.Name = names.SimpleNameGenerator.GenerateName(podName) pod.Name = names.SimpleNameGenerator.GenerateName(podName)
if ds != nil { if ds != nil {
@ -718,14 +725,8 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec ds.Spec.Template.Spec = podSpec
manager.dsStore.Add(ds) manager.dsStore.Add(ds)
manager.podStore.Add(&v1.Pod{ pod := newPod(ds.Name+"-", node.Name, simpleDaemonSetLabel, ds)
ObjectMeta: metav1.ObjectMeta{ manager.podStore.Add(pod)
Labels: simpleDaemonSetLabel,
Namespace: metav1.NamespaceDefault,
OwnerReferences: []metav1.OwnerReference{*newControllerRef(ds)},
},
Spec: podSpec,
})
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
} }
} }

View File

@ -79,7 +79,9 @@ func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, hash st
return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash) return dsc.syncNodes(ds, oldPodsToDelete, []string{}, hash)
} }
// constructHistory returns current history and a list of old histories of a given DaemonSet. // constructHistory finds all histories controlled by the given DaemonSet, and
// update current history revision number, or create current history if need to.
// It also deduplicates current history, and adds missing unique labels to existing histories.
func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) { func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur *apps.ControllerRevision, old []*apps.ControllerRevision, err error) {
var histories []*apps.ControllerRevision var histories []*apps.ControllerRevision
var currentHistories []*apps.ControllerRevision var currentHistories []*apps.ControllerRevision
@ -161,15 +163,11 @@ func (dsc *DaemonSetsController) constructHistory(ds *extensions.DaemonSet) (cur
return cur, old, err return cur, old, err
} }
func (dsc *DaemonSetsController) cleanupHistory(ds *extensions.DaemonSet) error { func (dsc *DaemonSetsController) cleanupHistory(ds *extensions.DaemonSet, old []*apps.ControllerRevision) error {
nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodesToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err) return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
} }
_, old, err := dsc.constructHistory(ds)
if err != nil {
return fmt.Errorf("failed to construct revisions of DaemonSet: %v", err)
}
toKeep := int(*ds.Spec.RevisionHistoryLimit) toKeep := int(*ds.Spec.RevisionHistoryLimit)
toKill := len(old) - toKeep toKill := len(old) - toKeep