diff --git a/pkg/controller/daemon/daemoncontroller.go b/pkg/controller/daemon/daemoncontroller.go index 3e1726f80d3..f3f40b7ddfd 100644 --- a/pkg/controller/daemon/daemoncontroller.go +++ b/pkg/controller/daemon/daemoncontroller.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/controller/framework/informers" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" + utilerrors "k8s.io/kubernetes/pkg/util/errors" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" @@ -55,10 +56,6 @@ const ( // performance requirements for kubernetes 1.0. BurstReplicas = 500 - // We must avoid counting pods until the pod store has synced. If it hasn't synced, to - // avoid a hot loop, we'll wait this long between checks. - PodStoreSyncedPollPeriod = 100 * time.Millisecond - // If sending a status upate to API server fails, we retry a finite number of times. StatusUpdateRetries = 1 ) @@ -99,12 +96,15 @@ type DaemonSetsController struct { nodeController *framework.Controller // podStoreSynced returns true if the pod store has been synced at least once. // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool + podStoreSynced framework.InformerSynced + // nodeStoreSynced returns true if the node store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + nodeStoreSynced framework.InformerSynced lookupCache *controller.MatchingCache - // Daemon sets that need to be synced. - queue *workqueue.Type + // DaemonSet keys that need to be synced. + queue workqueue.RateLimitingInterface } func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController { @@ -125,7 +125,7 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie }, burstReplicas: BurstReplicas, expectations: controller.NewControllerExpectations(), - queue: workqueue.NewNamed("daemonset"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "daemonset"), } // Manage addition/update of daemon sets. dsc.dsStore.Store, dsc.dsController = framework.NewInformer( @@ -199,6 +199,8 @@ func NewDaemonSetsController(podInformer framework.SharedIndexInformer, kubeClie UpdateFunc: dsc.updateNode, }, ) + dsc.nodeStoreSynced = dsc.nodeController.HasSynced + dsc.syncHandler = dsc.syncDaemonSet dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize) return dsc @@ -233,10 +235,17 @@ func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) { // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() + defer dsc.queue.ShutDown() + glog.Infof("Starting Daemon Sets controller manager") go dsc.dsController.Run(stopCh) go dsc.podController.Run(stopCh) go dsc.nodeController.Run(stopCh) + + if !framework.WaitForCacheSync(stopCh, dsc.podStoreSynced, dsc.nodeStoreSynced) { + return + } + for i := 0; i < workers; i++ { go wait.Until(dsc.runWorker, time.Second, stopCh) } @@ -247,23 +256,33 @@ func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { <-stopCh glog.Infof("Shutting down Daemon Set Controller") - dsc.queue.ShutDown() } func (dsc *DaemonSetsController) runWorker() { - for { - dsKey, quit := dsc.queue.Get() - if quit { - continue - } - err := dsc.syncHandler(dsKey.(string)) - if err != nil { - glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err) - } - dsc.queue.Done(dsKey) + for dsc.processNextWorkItem() { } } +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (dsc *DaemonSetsController) processNextWorkItem() bool { + dsKey, quit := dsc.queue.Get() + if quit { + return false + } + defer dsc.queue.Done(dsKey) + + err := dsc.syncHandler(dsKey.(string)) + if err == nil { + dsc.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + dsc.queue.AddRateLimited(dsKey) + + return true +} + func (dsc *DaemonSetsController) enqueueDaemonSet(ds *extensions.DaemonSet) { key, err := controller.KeyFunc(ds) if err != nil { @@ -467,18 +486,18 @@ func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *extensions.DaemonSet) return nodeToDaemonPods, nil } -func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { +func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) error { // Find out which nodes are running the daemon pods selected by ds. nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { - glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) + return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) } // For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon // pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node. nodeList, err := dsc.nodeStore.List() if err != nil { - glog.Errorf("Couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) + return fmt.Errorf("couldn't get list of nodes when syncing daemon set %#v: %v", ds, err) } var nodesNeedingDaemonPods, podsToDelete []string for _, node := range nodeList.Items { @@ -508,8 +527,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { // We need to set expectations before creating/deleting pods to avoid race conditions. dsKey, err := controller.KeyFunc(ds) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", ds, err) - return + return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } createDiff := len(nodesNeedingDaemonPods) @@ -524,6 +542,9 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { dsc.expectations.SetExpectations(dsKey, createDiff, deleteDiff) + // error channel to communicate back failures. make the buffer big enough to avoid any blocking + errCh := make(chan error, createDiff+deleteDiff) + glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v, creating %d", ds.Name, nodesNeedingDaemonPods, createDiff) createWait := sync.WaitGroup{} createWait.Add(createDiff) @@ -533,6 +554,7 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { if err := dsc.podControl.CreatePodsOnNode(nodesNeedingDaemonPods[ix], ds.Namespace, &ds.Spec.Template, ds); err != nil { glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.CreationObserved(dsKey) + errCh <- err utilruntime.HandleError(err) } }(i) @@ -548,11 +570,20 @@ func (dsc *DaemonSetsController) manage(ds *extensions.DaemonSet) { if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[ix], ds); err != nil { glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name) dsc.expectations.DeletionObserved(dsKey) + errCh <- err utilruntime.HandleError(err) } }(i) } deleteWait.Wait() + + // collect errors if any for proper reporting/retry logic in the controller + errors := []error{} + close(errCh) + for err := range errCh { + errors = append(errors, err) + } + return utilerrors.NewAggregate(errors) } func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds *extensions.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error { @@ -582,18 +613,16 @@ func storeDaemonSetStatus(dsClient unversionedextensions.DaemonSetInterface, ds return updateErr } -func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) { +func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) error { glog.V(4).Infof("Updating daemon set status") nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds) if err != nil { - glog.Errorf("Error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) - return + return fmt.Errorf("error getting node to daemon pod mapping for daemon set %#v: %v", ds, err) } nodeList, err := dsc.nodeStore.List() if err != nil { - glog.Errorf("Couldn't get list of nodes when updating daemon set %#v: %v", ds, err) - return + return fmt.Errorf("couldn't get list of nodes when updating daemon set %#v: %v", ds, err) } var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int @@ -616,8 +645,10 @@ func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *extensions.DaemonSet) err = storeDaemonSetStatus(dsc.kubeClient.Extensions().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled) if err != nil { - glog.Errorf("Error storing status for daemon set %#v: %v", ds, err) + return fmt.Errorf("error storing status for daemon set %#v: %v", ds, err) } + + return nil } func (dsc *DaemonSetsController) syncDaemonSet(key string) error { @@ -626,19 +657,9 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime)) }() - if !dsc.podStoreSynced() { - // Sleep so we give the pod reflector goroutine a chance to run. - time.Sleep(PodStoreSyncedPollPeriod) - glog.Infof("Waiting for pods controller to sync, requeuing ds %v", key) - dsc.queue.Add(key) - return nil - } - obj, exists, err := dsc.dsStore.Store.GetByKey(key) if err != nil { - glog.Infof("Unable to retrieve ds %v from store: %v", key, err) - dsc.queue.Add(key) - return err + return fmt.Errorf("unable to retrieve ds %v from store: %v", key, err) } if !exists { glog.V(3).Infof("daemon set has been deleted %v", key) @@ -658,16 +679,16 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { // then we do not want to call manage on foo until the daemon pods have been created. dsKey, err := controller.KeyFunc(ds) if err != nil { - glog.Errorf("Couldn't get key for object %#v: %v", ds, err) - return err + return fmt.Errorf("couldn't get key for object %#v: %v", ds, err) } dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey) if dsNeedsSync && ds.DeletionTimestamp == nil { - dsc.manage(ds) + if err := dsc.manage(ds); err != nil { + return err + } } - dsc.updateDaemonSetStatus(ds) - return nil + return dsc.updateDaemonSetStatus(ds) } func (dsc *DaemonSetsController) nodeShouldRunDaemonPod(node *api.Node, ds *extensions.DaemonSet) bool { diff --git a/pkg/controller/daemon/daemoncontroller_test.go b/pkg/controller/daemon/daemoncontroller_test.go index eec7dbe5c86..d7e5a0fe8e5 100644 --- a/pkg/controller/daemon/daemoncontroller_test.go +++ b/pkg/controller/daemon/daemoncontroller_test.go @@ -138,6 +138,7 @@ func newTestController() (*DaemonSetsController, *controller.FakePodControl) { clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) manager := NewDaemonSetsControllerFromClient(clientset, controller.NoResyncPeriodFunc, 0) manager.podStoreSynced = alwaysReady + manager.nodeStoreSynced = alwaysReady podControl := &controller.FakePodControl{} manager.podControl = podControl return manager, podControl @@ -539,28 +540,6 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) { syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } -func TestDSManagerNotReady(t *testing.T) { - manager, podControl := newTestController() - manager.podStoreSynced = func() bool { return false } - addNodes(manager.nodeStore.Store, 0, 1, nil) - - // Simulates the ds reflector running before the pod reflector. We don't - // want to end up creating daemon pods in this case until the pod reflector - // has synced, so the ds manager should just requeue the ds. - ds := newDaemonSet("foo") - manager.dsStore.Add(ds) - - dsKey := getKey(ds, t) - syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) - queueDS, _ := manager.queue.Get() - if queueDS != dsKey { - t.Fatalf("Expected to find key %v in queue, found %v", dsKey, queueDS) - } - - manager.podStoreSynced = alwaysReady - syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) -} - // Daemon with node affinity should launch pods on nodes matching affinity. func TestNodeAffinityDaemonLaunchesPods(t *testing.T) { manager, podControl := newTestController() diff --git a/pkg/controller/framework/shared_informer.go b/pkg/controller/framework/shared_informer.go index 87dcac6d22b..c5807054bac 100644 --- a/pkg/controller/framework/shared_informer.go +++ b/pkg/controller/framework/shared_informer.go @@ -21,9 +21,12 @@ import ( "sync" "time" + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/wait" ) // if you use this, there is one behavior change compared to a standard Informer. @@ -75,6 +78,34 @@ func NewSharedIndexInformer(lw cache.ListerWatcher, objType runtime.Object, resy return sharedIndexInformer } +// InformerSynced is a function that can be used to determine if an informer has synced. This is useful for determining if caches have synced. +type InformerSynced func() bool + +// syncedPollPeriod controls how often you look at the status of your sync funcs +const syncedPollPeriod = 100 * time.Millisecond + +// WaitForCacheSync waits for caches to populate. It returns true if it was successful, false +// if the contoller should shutdown +func WaitForCacheSync(stopCh <-chan struct{}, cacheSyncs ...InformerSynced) bool { + err := wait.PollUntil(syncedPollPeriod, + func() (bool, error) { + for _, syncFunc := range cacheSyncs { + if !syncFunc() { + return false, nil + } + } + return true, nil + }, + stopCh) + if err != nil { + glog.V(2).Infof("stop requested") + return false + } + + glog.V(4).Infof("caches populated") + return true +} + type sharedIndexInformer struct { indexer cache.Indexer controller *Controller diff --git a/pkg/util/wait/wait.go b/pkg/util/wait/wait.go index 517577c8c8b..b58a6288d0f 100644 --- a/pkg/util/wait/wait.go +++ b/pkg/util/wait/wait.go @@ -186,7 +186,12 @@ func pollImmediateInternal(wait WaitFunc, condition ConditionFunc) error { func PollInfinite(interval time.Duration, condition ConditionFunc) error { done := make(chan struct{}) defer close(done) - return WaitFor(poller(interval, 0), condition, done) + return PollUntil(interval, condition, done) +} + +// PollUntil is like Poll, but it takes a stop change instead of total duration +func PollUntil(interval time.Duration, condition ConditionFunc, stopCh <-chan struct{}) error { + return WaitFor(poller(interval, 0), condition, stopCh) } // WaitFunc creates a channel that receives an item every time a test diff --git a/pkg/util/wait/wait_test.go b/pkg/util/wait/wait_test.go index d9d02ec0ea1..e2ff86041aa 100644 --- a/pkg/util/wait/wait_test.go +++ b/pkg/util/wait/wait_test.go @@ -432,3 +432,33 @@ func TestWaitForWithDelay(t *testing.T) { t.Errorf("expected an ack of the done signal.") } } + +func TestPollUntil(t *testing.T) { + stopCh := make(chan struct{}) + called := make(chan bool) + pollDone := make(chan struct{}) + + go func() { + PollUntil(time.Microsecond, ConditionFunc(func() (bool, error) { + called <- true + return false, nil + }), stopCh) + + close(pollDone) + }() + + // make sure we're called once + <-called + // this should trigger a "done" + close(stopCh) + + go func() { + // release the condition func if needed + for { + <-called + } + }() + + // make sure we finished the poll + <-pollDone +}