Merge pull request #14896 from davidopp/master

Fix race condition in DaemonSet controller. Fixes #14693.
This commit is contained in:
Alex Robinson 2015-10-05 12:53:23 -07:00
commit 53067d0978
2 changed files with 55 additions and 0 deletions

View File

@ -40,10 +40,17 @@ import (
const (
// Daemon sets will periodically check that their daemon pods are running as expected.
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
// Nodes don't need relisting.
FullNodeResyncPeriod = 0
// Daemon pods don't need relisting.
FullDaemonPodResyncPeriod = 0
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
// If sending a status upate to API server fails, we retry a finite number of times.
StatusUpdateRetries = 1
)
@ -70,6 +77,10 @@ type DaemonSetsController struct {
podController *framework.Controller
// Watches changes to all nodes.
nodeController *framework.Controller
// podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
podStoreSynced func() bool
// Daemon sets that need to be synced.
queue *workqueue.Type
}
@ -155,11 +166,13 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController
},
)
dsc.syncHandler = dsc.syncDaemonSet
dsc.podStoreSynced = dsc.podController.HasSynced
return dsc
}
// Run begins watching and syncing daemon sets.
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go dsc.dsController.Run(stopCh)
go dsc.podController.Run(stopCh)
go dsc.nodeController.Run(stopCh)
@ -205,6 +218,8 @@ func (dsc *DaemonSetsController) enqueueDaemonSet(obj interface{}) {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
return
}
// TODO: Handle overlapping controllers better. See comment in ReplicationManager.
dsc.queue.Add(key)
}
@ -467,6 +482,13 @@ func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
return nil
}
ds := obj.(*experimental.DaemonSet)
if !dsc.podStoreSynced() {
// Sleep so we give the pod reflector goroutine a chance to run.
time.Sleep(PodStoreSyncedPollPeriod)
glog.Infof("Waiting for pods controller to sync, requeuing ds %v", ds.Name)
dsc.enqueueDaemonSet(ds)
return nil
}
// Don't process a daemon set until all its creations and deletions have been processed.
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,

View File

@ -35,12 +35,22 @@ var (
simpleDaemonSetLabel2 = map[string]string{"name": "simple-daemon", "type": "test"}
simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"}
simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"}
alwaysReady = func() bool { return true }
)
func init() {
api.ForTesting_ReferencesAllowBlankSelfLinks = true
}
func getKey(ds *experimental.DaemonSet, t *testing.T) string {
if key, err := controller.KeyFunc(ds); err != nil {
t.Errorf("Unexpected error getting key for ds %v: %v", ds.Name, err)
return ""
} else {
return key
}
}
func newDaemonSet(name string) *experimental.DaemonSet {
return &experimental.DaemonSet{
TypeMeta: unversioned.TypeMeta{APIVersion: testapi.Experimental.Version()},
@ -121,6 +131,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewDaemonSetsController(client)
manager.podStoreSynced = alwaysReady
podControl := &controller.FakePodControl{}
manager.podControl = podControl
return manager, podControl
@ -282,3 +293,25 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
}
func TestDSManagerNotReady(t *testing.T) {
manager, podControl := newTestController()
manager.podStoreSynced = func() bool { return false }
addNodes(manager.nodeStore.Store, 0, 1, nil)
// Simulates the ds reflector running before the pod reflector. We don't
// want to end up creating daemon pods in this case until the pod reflector
// has synced, so the ds manager should just requeue the ds.
ds := newDaemonSet("foo")
manager.dsStore.Add(ds)
dsKey := getKey(ds, t)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
queueDS, _ := manager.queue.Get()
if queueDS != dsKey {
t.Fatalf("Expected to find key %v in queue, found %v", dsKey, queueDS)
}
manager.podStoreSynced = alwaysReady
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
}