DaemonSet: Relist Pods before each phase of sync.

The design of DaemonSet requires a relist before each phase (manage,
update, status) because it does not short-circuit and requeue for each
action triggered.
This commit is contained in:
Anthony Yeh 2017-03-07 15:01:11 -08:00
parent e2deb1795d
commit fac372d090
3 changed files with 55 additions and 18 deletions

View File

@ -504,7 +504,13 @@ func (dsc *DaemonSetsController) resolveControllerRef(namespace string, controll
return ds return ds
} }
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
// Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeLister.List(labels.Everything()) nodeList, err := dsc.nodeLister.List(labels.Everything())
@ -682,8 +688,12 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
return updateErr return updateErr
} }
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error {
glog.V(4).Infof("Updating daemon set status") glog.V(4).Infof("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
nodeList, err := dsc.nodeLister.List(labels.Everything()) nodeList, err := dsc.nodeLister.List(labels.Everything())
if err != nil { if err != nil {
@ -760,12 +770,6 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
return nil return nil
} }
// Find out which nodes are running the daemon pods controlled by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
// Don't process a daemon set until all its creations and deletions have been processed. // Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
// then we do not want to call manage on foo until the daemon pods have been created. // then we do not want to call manage on foo until the daemon pods have been created.
@ -773,25 +777,27 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
if err != nil { if err != nil {
return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
} }
dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) if ds.DeletionTimestamp != nil || !dsc.expectations.SatisfiedExpectations(dsKey) {
if dsNeedsSync && ds.DeletionTimestamp == nil { // Only update status.
if err := dsc.manage(ds, nodeToDaemonPods); err != nil { return dsc.updateDaemonSetStatus(ds)
return err
}
} }
dsNeedsSync = dsc.expectations.SatisfiedExpectations(dsKey) if err := dsc.manage(ds); err != nil {
if dsNeedsSync && ds.DeletionTimestamp == nil { return err
}
// Process rolling updates if we're ready.
if dsc.expectations.SatisfiedExpectations(dsKey) {
switch ds.Spec.UpdateStrategy.Type { switch ds.Spec.UpdateStrategy.Type {
case extensions.RollingUpdateDaemonSetStrategyType: case extensions.RollingUpdateDaemonSetStrategyType:
err = dsc.rollingUpdate(ds, nodeToDaemonPods) err = dsc.rollingUpdate(ds)
} }
if err != nil { if err != nil {
return err return err
} }
} }
return dsc.updateDaemonSetStatus(ds, nodeToDaemonPods) return dsc.updateDaemonSetStatus(ds)
} }
// nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a // nodeShouldRunDaemonPod checks a set of preconditions against a (node,daemonset) and returns a

View File

@ -346,6 +346,32 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0) syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0)
} }
func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
manager, podControl, clientset := newTestController()
var updated *extensions.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
return false, nil, nil
}
if u, ok := action.(core.UpdateAction); ok {
updated = u.GetObject().(*extensions.DaemonSet)
}
return false, nil, nil
})
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0)
// Make sure the single sync() updated Status already for the change made
// during the manage() phase.
if got, want := updated.Status.CurrentNumberScheduled, int32(5); got != want {
t.Errorf("Status.CurrentNumberScheduled = %v, want %v", got, want)
}
}
// DaemonSets should do nothing if there aren't any nodes // DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) { func TestNoNodesDoesNothing(t *testing.T) {
manager, podControl, _ := newTestController() manager, podControl, _ := newTestController()

View File

@ -31,7 +31,12 @@ import (
// rollingUpdate deletes old daemon set pods making sure that no more than // rollingUpdate deletes old daemon set pods making sure that no more than
// ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable // ds.Spec.UpdateStrategy.RollingUpdate.MaxUnavailable pods are unavailable
func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet, nodeToDaemonPods map[string][]*v1.Pod) error { func (dsc *DaemonSetsController) rollingUpdate(ds *extensions.DaemonSet) error {
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil {
return fmt.Errorf("couldn't get node to daemon pod mapping for daemon set %q: %v", ds.Name, err)
}
_, oldPods, err := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods) _, oldPods, err := dsc.getAllDaemonSetPods(ds, nodeToDaemonPods)
maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods) maxUnavailable, numUnavailable, err := dsc.getUnavailableNumbers(ds, nodeToDaemonPods)
if err != nil { if err != nil {