Merge pull request #32210 from deads2k/controller-01-cachewait-handle

Automatic merge from submit-queue

update error handling for daemoncontroller

Updates the DaemonSet controller to cleanly requeue with ratelimiting on errors, make use of the `utilruntime.HandleError` consistently, and wait for preconditions before doing work.

@ncdc @liggitt @sttts My plan is to use this one as an example of how to handle requeuing, preconditions, and processing error handling.
@foxish fyi

related to https://github.com/kubernetes/kubernetes/issues/30629
This commit is contained in:
Kubernetes Submit Queue 2016-09-12 17:13:19 -07:00 committed by GitHub
commit 51b5ff59b9
5 changed files with 135 additions and 69 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/controller/framework/informers"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/metrics" "k8s.io/kubernetes/pkg/util/metrics"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
@ -55,10 +56,6 @@ const (
// performance requirements for kubernetes 1.0. // performance requirements for kubernetes 1.0.
BurstReplicas = 500 BurstReplicas = 500
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// If sending a status upate to API server fails, we retry a finite number of times. // If sending a status upate to API server fails, we retry a finite number of times.
StatusUpdateRetries = 1 StatusUpdateRetries = 1
) )
@ -99,12 +96,15 @@ type DaemonSetsController struct {
nodeController *framework.Controller nodeController *framework.Controller
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool podStoreSynced framework.InformerSynced
// nodeStoreSynced returns true if the node store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
nodeStoreSynced framework.InformerSynced
lookupCache *controller.MatchingCache lookupCache *controller.MatchingCache
// Daemon sets that need to be synced. // DaemonSet keys that need to be synced.
queue *workqueue.Type queue workqueue.RateLimitingInterface
} }
func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
@ -125,7 +125,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
}, },
burstReplicas: BurstReplicas, burstReplicas: BurstReplicas,
expectations: controller.NewControllerExpectations(), expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamed("daemonset"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"),
} }
// Manage addition/update of daemon sets. // Manage addition/update of daemon sets.
dsc.dsStore.Store, dsc.dsController = framework.NewInformer( dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
@ -199,6 +199,8 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie
UpdateFunc: dsc.updateNode, UpdateFunc: dsc.updateNode,
}, },
) )
dsc.nodeStoreSynced = dsc.nodeController.HasSynced
dsc.syncHandler = dsc.syncDaemonSet dsc.syncHandler = dsc.syncDaemonSet
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
return dsc return dsc
@ -233,10 +235,17 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {
// Run begins watching and syncing daemon sets. // Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer dsc.queue.ShutDown()
glog.Infof("Starting Daemon Sets controller manager") glog.Infof("Starting Daemon Sets controller manager")
go dsc.dsController.Run(stopCh) go dsc.dsController.Run(stopCh)
go dsc.podController.Run(stopCh) go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh) go dsc.nodeController.Run(stopCh)
if !framework.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) {
return
}
for i := 0; i < workers; i++ { for i := 0; i < workers; i++ {
go wait.Until(dsc.runWorker, time.Second, stopCh) go wait.Until(dsc.runWorker, time.Second, stopCh)
} }
@ -247,21 +256,31 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
<-stopCh <-stopCh
glog.Infof("Shutting down Daemon Set Controller") glog.Infof("Shutting down Daemon Set Controller")
dsc.queue.ShutDown()
} }
func (dsc *DaemonSetsController) runWorker() { func (dsc *DaemonSetsController) runWorker() {
for { for dsc.processNextWorkItem() {
}
}
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
func (dsc *DaemonSetsController) processNextWorkItem() bool {
dsKey, quit := dsc.queue.Get() dsKey, quit := dsc.queue.Get()
if quit { if quit {
continue return false
} }
defer dsc.queue.Done(dsKey)
err := dsc.syncHandler(dsKey.(string)) err := dsc.syncHandler(dsKey.(string))
if err != nil { if err == nil {
glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err) dsc.queue.Forget(dsKey)
} return true
dsc.queue.Done(dsKey)
} }
utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err))
dsc.queue.AddRateLimited(dsKey)
return true
} }
func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) { func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) {
@ -467,18 +486,18 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet)
return nodeToDaemonPods, nil return nodeToDaemonPods, nil
} }
func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error {
// Find out which nodes are running the daemon pods selected by ds. // Find out which nodes are running the daemon pods selected by ds.
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
} }
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
nodeList, err := dsc.nodeStore.List() nodeList, err := dsc.nodeStore.List()
if err != nil { if err != nil {
glog.Errorf("Couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err)
} }
var nodesNeedingDaemonPods, podsToDelete []string var nodesNeedingDaemonPods, podsToDelete []string
for _, node := range nodeList.Items { for _, node := range nodeList.Items {
@ -508,8 +527,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
// We need to set expectations before creating/deleting pods to avoid race conditions. // We need to set expectations before creating/deleting pods to avoid race conditions.
dsKey, err := controller.KeyFunc(ds) dsKey, err := controller.KeyFunc(ds)
if err != nil { if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", ds, err) return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
return
} }
createDiff := len(nodesNeedingDaemonPods) createDiff := len(nodesNeedingDaemonPods)
@ -524,6 +542,9 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff)
// error channel to communicate back failures. make the buffer big enough to avoid any blocking
errCh := make(chan error, createDiff+deleteDiff)
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff)
createWait := sync.WaitGroup{} createWait := sync.WaitGroup{}
createWait.Add(createDiff) createWait.Add(createDiff)
@ -533,6 +554,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil { if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil {
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.CreationObserved(dsKey) dsc.expectations.CreationObserved(dsKey)
errCh <- err
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
}(i) }(i)
@ -548,11 +570,20 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) {
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil {
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
dsc.expectations.DeletionObserved(dsKey) dsc.expectations.DeletionObserved(dsKey)
errCh <- err
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
}(i) }(i)
} }
deleteWait.Wait() deleteWait.Wait()
// collect errors if any for proper reporting/retry logic in the controller
errors := []error{}
close(errCh)
for err := range errCh {
errors = append(errors, err)
}
return utilerrors.NewAggregate(errors)
} }
func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error { func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
@ -582,18 +613,16 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds
return updateErr return updateErr
} }
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) { func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error {
glog.V(4).Infof("Updating daemon set status") glog.V(4).Infof("Updating daemon set status")
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
if err != nil { if err != nil {
glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err)
return
} }
nodeList, err := dsc.nodeStore.List() nodeList, err := dsc.nodeStore.List()
if err != nil { if err != nil {
glog.Errorf("Couldn't get list of nodes when updating daemon set %#v: %v", ds, err) return fmt.Errorf("couldn't get list of nodes when updating daemon set %#v: %v", ds, err)
return
} }
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
@ -616,8 +645,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet)
err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled) err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
if err != nil { if err != nil {
glog.Errorf("Error storing status for daemon set %#v: %v", ds, err) return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err)
} }
return nil
} }
func (dsc *DaemonSetsController) syncDaemonSet(key string) error { func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
@ -626,19 +657,9 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime)) glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime))
}() }()
if !dsc.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing ds %v", key)
dsc.queue.Add(key)
return nil
}
obj, exists, err := dsc.dsStore.Store.GetByKey(key) obj, exists, err := dsc.dsStore.Store.GetByKey(key)
if err != nil { if err != nil {
glog.Infof("Unable to retrieve ds %v from store: %v", key, err) return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err)
dsc.queue.Add(key)
return err
} }
if !exists { if !exists {
glog.V(3).Infof("daemon set has been deleted %v", key) glog.V(3).Infof("daemon set has been deleted %v", key)
@ -658,16 +679,16 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
// then we do not want to call manage on foo until the daemon pods have been created. // then we do not want to call manage on foo until the daemon pods have been created.
dsKey, err := controller.KeyFunc(ds) dsKey, err := controller.KeyFunc(ds)
if err != nil { if err != nil {
glog.Errorf("Couldn't get key for object %#v: %v", ds, err) return fmt.Errorf("couldn't get key for object %#v: %v", ds, err)
return err
} }
dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey)
if dsNeedsSync && ds.DeletionTimestamp == nil { if dsNeedsSync && ds.DeletionTimestamp == nil {
dsc.manage(ds) if err := dsc.manage(ds); err != nil {
return err
}
} }
dsc.updateDaemonSetStatus(ds) return dsc.updateDaemonSetStatus(ds)
return nil
} }
func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool {

View File

@ -138,6 +138,7 @@ func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0) manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0)
manager.podStoreSynced = alwaysReady manager.podStoreSynced = alwaysReady
manager.nodeStoreSynced = alwaysReady
podControl := &controller.FakePodControl{} podControl := &controller.FakePodControl{}
manager.podControl = podControl manager.podControl = podControl
return manager, podControl return manager, podControl
@ -539,28 +540,6 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
} }
func TestDSManagerNotReady(t *testing.T) {
manager, podControl := newTestController()
manager.podStoreSynced = func() bool { return false }
addNodes(manager.nodeStore.Store, 0, 1, nil)
// Simulates the ds reflector running before the pod reflector. We don't
// want to end up creating daemon pods in this case until the pod reflector
// has synced, so the ds manager should just requeue the ds.
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
dsKey := getKey(ds, t)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
queueDS, _ := manager.queue.Get()
if queueDS != dsKey {
t.Fatalf("Expected to find key %v in queue, found %v", dsKey, queueDS)
}
manager.podStoreSynced = alwaysReady
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
}
// Daemon with node affinity should launch pods on nodes matching affinity. // Daemon with node affinity should launch pods on nodes matching affinity.
func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
manager, podControl := newTestController() manager, podControl := newTestController()

View File

@ -21,9 +21,12 @@ import (
"sync" "sync"
"time" "time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
utilruntime "k8s.io/kubernetes/pkg/util/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wait"
) )
// if you use this, there is one behavior change compared to a standard Informer. // if you use this, there is one behavior change compared to a standard Informer.
@ -75,6 +78,34 @@ func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resy
return sharedIndexInformer return sharedIndexInformer
} }
// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced.
type InformerSynced func() bool
// syncedPollPeriod controls how often you look at the status of your sync funcs
const syncedPollPeriod = 100 * time.Millisecond
// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false
// if the contoller should shutdown
func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool {
err := wait.PollUntil(syncedPollPeriod,
func() (bool, error) {
for _, syncFunc := range cacheSyncs {
if !syncFunc() {
return false, nil
}
}
return true, nil
},
stopCh)
if err != nil {
glog.V(2).Infof("stop requested")
return false
}
glog.V(4).Infof("caches populated")
return true
}
type sharedIndexInformer struct { type sharedIndexInformer struct {
indexer cache.Indexer indexer cache.Indexer
controller *Controller controller *Controller

View File

@ -186,7 +186,12 @@ func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error {
func PollInfinite(interval time.Duration, condition ConditionFunc) error { func PollInfinite(interval time.Duration, condition ConditionFunc) error {
done := make(chan struct{}) done := make(chan struct{})
defer close(done) defer close(done)
return WaitFor(poller(interval, 0), condition, done) return PollUntil(interval, condition, done)
}
// PollUntil is like Poll, but it takes a stop change instead of total duration
func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error {
return WaitFor(poller(interval, 0), condition, stopCh)
} }
// WaitFunc creates a channel that receives an item every time a test // WaitFunc creates a channel that receives an item every time a test

View File

@ -432,3 +432,33 @@ func TestWaitForWithDelay(t *testing.T) {
t.Errorf("expected an ack of the done signal.") t.Errorf("expected an ack of the done signal.")
} }
} }
func TestPollUntil(t *testing.T) {
stopCh := make(chan struct{})
called := make(chan bool)
pollDone := make(chan struct{})
go func() {
PollUntil(time.Microsecond, ConditionFunc(func() (bool, error) {
called <- true
return false, nil
}), stopCh)
close(pollDone)
}()
// make sure we're called once
<-called
// this should trigger a "done"
close(stopCh)
go func() {
// release the condition func if needed
for {
<-called
}
}()
// make sure we finished the poll
<-pollDone
}