diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index f55307432a3..b4fb9e7f2f8 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -40,10 +40,17 @@ import ( const ( // Daemon sets will periodically check that their daemon pods are running as expected. FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. + // Nodes don't need relisting. FullNodeResyncPeriod = 0 + // Daemon pods don't need relisting. FullDaemonPodResyncPeriod = 0 + + // We must avoid counting pods until the pod store has synced. If it hasn't synced, to + // avoid a hot loop, we'll wait this long between checks. + PodStoreSyncedPollPeriod = 100 * time.Millisecond + // If sending a status upate to API server fails, we retry a finite number of times. StatusUpdateRetries = 1 ) @@ -70,6 +77,10 @@ type DaemonSetsController struct { podController *framework.Controller // Watches changes to all nodes. nodeController *framework.Controller + // podStoreSynced returns true if the pod store has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podStoreSynced func() bool + // Daemon sets that need to be synced. queue *workqueue.Type } @@ -155,11 +166,13 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, ) dsc.syncHandler = dsc.syncDaemonSet + dsc.podStoreSynced = dsc.podController.HasSynced return dsc } // Run begins watching and syncing daemon sets. func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() go dsc.dsController.Run(stopCh) go dsc.podController.Run(stopCh) go dsc.nodeController.Run(stopCh) @@ -205,6 +218,8 @@ func (dsc *DaemonSetsController) enqueueDaemonSet(obj interface{}) { glog.Errorf("Couldn't get key for object %+v: %v", obj, err) return } + + // TODO: Handle overlapping controllers better. See comment in ReplicationManager. dsc.queue.Add(key) } @@ -467,6 +482,13 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error { return nil } ds := obj.(*experimental.DaemonSet) + if !dsc.podStoreSynced() { + // Sleep so we give the pod reflector goroutine a chance to run. + time.Sleep(PodStoreSyncedPollPeriod) + glog.Infof("Waiting for pods controller to sync, requeuing ds %v", ds.Name) + dsc.enqueueDaemonSet(ds) + return nil + } // Don't process a daemon set until all its creations and deletions have been processed. // For example if daemon set foo asked for 3 new daemon pods in the previous call to manage, diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index fb9abeadd2d..8298bee3ebc 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -35,12 +35,22 @@ var ( simpleDaemonSetLabel2 = map[string]string{"name": "simple-daemon", "type": "test"} simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"} simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"} + alwaysReady = func() bool { return true } ) func init() { api.ForTesting_ReferencesAllowBlankSelfLinks = true } +func getKey(ds *experimental.DaemonSet, t *testing.T) string { + if key, err := controller.KeyFunc(ds); err != nil { + t.Errorf("Unexpected error getting key for ds %v: %v", ds.Name, err) + return "" + } else { + return key + } +} + func newDaemonSet(name string) *experimental.DaemonSet { return &experimental.DaemonSet{ TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Experimental.Version()}, @@ -121,6 +131,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num func newTestController() (*DaemonSetsController, *controller.FakePodControl) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) manager := NewDaemonSetsController(client) + manager.podStoreSynced = alwaysReady podControl := &controller.FakePodControl{} manager.podControl = podControl return manager, podControl @@ -282,3 +293,25 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) { manager.dsStore.Add(ds) syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) } + +func TestDSManagerNotReady(t *testing.T) { + manager, podControl := newTestController() + manager.podStoreSynced = func() bool { return false } + addNodes(manager.nodeStore.Store, 0, 1, nil) + + // Simulates the ds reflector running before the pod reflector. We don't + // want to end up creating daemon pods in this case until the pod reflector + // has synced, so the ds manager should just requeue the ds. + ds := newDaemonSet("foo") + manager.dsStore.Add(ds) + + dsKey := getKey(ds, t) + syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0) + queueDS, _ := manager.queue.Get() + if queueDS != dsKey { + t.Fatalf("Expected to find key %v in queue, found %v", dsKey, queueDS) + } + + manager.podStoreSynced = alwaysReady + syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) +}