From d802778c209b1996f5a7066993698aa52c9dd128 Mon Sep 17 00:00:00 2001 From: mqliang Date: Thu, 11 Feb 2016 11:43:49 +0800 Subject: [PATCH 1/2] Revert "Sync all pods with store before starting RC Manager." This reverts commit 1b93ee7b353eb6d691a4f2c4cd9abcdd8858c468. --- pkg/controller/controller_utils.go | 34 ------------------- .../replication/replication_controller.go | 1 - .../replication_controller_test.go | 24 ------------- 3 files changed, 59 deletions(-) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index a4206d8bd35..9d573560e78 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -31,7 +31,6 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/controller/framework" - "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/runtime" ) @@ -433,39 +432,6 @@ func FilterActivePods(pods []api.Pod) []*api.Pod { return result } -// SyncAllPodsWithStore lists all pods and inserts them into the given store. -// Though this function is written in a generic manner, it is only used by the -// controllers for a specific purpose, to synchronously populate the store -// with the first batch of pods that would otherwise be sent by the Informer. -// Doing this avoids more complicated forms of synchronization with the -// Informer, though it also means that the controller calling this function -// will receive "OnUpdate" events for all the pods in the store, instead of -// "OnAdd". This should be ok, since most controllers are level triggered -// and make decisions based on the contents of the store. -// -// TODO: Extend this logic to load arbitrary local state for the controllers -// instead of just pods. -func SyncAllPodsWithStore(kubeClient clientset.Interface, store cache.Store) { - var allPods *api.PodList - var err error - listOptions := api.ListOptions{LabelSelector: labels.Everything(), FieldSelector: fields.Everything()} - for { - if allPods, err = kubeClient.Core().Pods(api.NamespaceAll).List(listOptions); err != nil { - glog.Warningf("Retrying pod list: %v", err) - continue - } - break - } - pods := []interface{}{} - for i := range allPods.Items { - p := allPods.Items[i] - glog.V(4).Infof("Initializing store with pod %v/%v", p.Namespace, p.Name) - pods = append(pods, &p) - } - store.Replace(pods, allPods.ResourceVersion) - return -} - // ControllersByCreationTimestamp sorts a list of ReplicationControllers by creation timestamp, using their names as a tie breaker. type ControllersByCreationTimestamp []*api.ReplicationController diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index b757d3bc0f4..73ddf019c0d 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -187,7 +187,6 @@ func (rm *ReplicationManager) SetEventRecorder(recorder record.EventRecorder) { func (rm *ReplicationManager) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting RC Manager") - controller.SyncAllPodsWithStore(rm.kubeClient, rm.podStore.Store) go rm.rcController.Run(stopCh) go rm.podController.Run(stopCh) for i := 0; i < workers; i++ { diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index b5361d73bee..7ea9b3c3ec8 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -892,27 +892,3 @@ func TestOverlappingRCs(t *testing.T) { } } } - -func TestRCManagerInit(t *testing.T) { - // Insert a stable rc into the replication manager's store and make sure - // it syncs pods with the apiserver before making any decisions. - rc := newReplicationController(2) - response := runtime.EncodeOrDie(testapi.Default.Codec(), newPodList(nil, 2, api.PodRunning, rc)) - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: response, - } - testServer := httptest.NewServer(&fakeHandler) - // TODO: Uncomment when fix #19254 - // defer testServer.Close() - - c := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewReplicationManager(c, controller.NoResyncPeriodFunc, BurstReplicas) - manager.rcStore.Store.Add(rc) - manager.podStoreSynced = alwaysReady - controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store) - fakePodControl := &controller.FakePodControl{} - manager.podControl = fakePodControl - manager.syncReplicationController(getKey(rc, t)) - validateSyncReplication(t, fakePodControl, 0, 0) -} From 91124afdd7dc17500fe35acaa638c2210af1122a Mon Sep 17 00:00:00 2001 From: mqliang Date: Thu, 11 Feb 2016 11:46:16 +0800 Subject: [PATCH 2/2] Revert "Sync pods for daemon sets." This reverts commit ffd34311c69b67df007af813476f80faca2aa53b. --- pkg/controller/daemon/controller.go | 2 +- pkg/controller/daemon/controller_test.go | 39 ------------------------ 2 files changed, 1 insertion(+), 40 deletions(-) diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 7fbff804288..fad8f721f53 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -187,7 +187,6 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() glog.Infof("Starting Daemon Sets controller manager") - controller.SyncAllPodsWithStore(dsc.kubeClient, dsc.podStore.Store) go dsc.dsController.Run(stopCh) go dsc.podController.Run(stopCh) go dsc.nodeController.Run(stopCh) @@ -472,6 +471,7 @@ func storeDaemonSetStatus(dsClient unversioned_extensions.DaemonSetInterface, ds if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled { return nil } + var updateErr, getErr error for i := 0; i <= StatusUpdateRetries; i++ { ds.Status.DesiredNumberScheduled = desiredNumberScheduled diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 77df3394f81..8a0bf55c711 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -18,7 +18,6 @@ package daemon import ( "fmt" - "net/http/httptest" "testing" "k8s.io/kubernetes/pkg/api" @@ -30,9 +29,7 @@ import ( clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/securitycontext" - utiltesting "k8s.io/kubernetes/pkg/util/testing" ) var ( @@ -457,39 +454,3 @@ func TestDSManagerNotReady(t *testing.T) { manager.podStoreSynced = alwaysReady syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0) } - -func TestDSManagerInit(t *testing.T) { - // Insert a stable daemon set and make sure we don't create an extra pod - // for the one node which already has a daemon after a simulated restart. - ds := newDaemonSet("test") - ds.Status = extensions.DaemonSetStatus{ - CurrentNumberScheduled: 1, - NumberMisscheduled: 0, - DesiredNumberScheduled: 1, - } - nodeName := "only-node" - podList := &api.PodList{ - Items: []api.Pod{ - *newPod("podname", nodeName, simpleDaemonSetLabel), - }} - response := runtime.EncodeOrDie(testapi.Default.Codec(), podList) - fakeHandler := utiltesting.FakeHandler{ - StatusCode: 200, - ResponseBody: response, - } - testServer := httptest.NewServer(&fakeHandler) - // TODO: Uncomment when fix #19254 - // defer testServer.Close() - - clientset := clientset.NewForConfigOrDie(&client.Config{Host: testServer.URL, ContentConfig: client.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}}) - manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc) - manager.dsStore.Add(ds) - manager.nodeStore.Add(newNode(nodeName, nil)) - manager.podStoreSynced = alwaysReady - controller.SyncAllPodsWithStore(manager.kubeClient, manager.podStore.Store) - - fakePodControl := &controller.FakePodControl{} - manager.podControl = fakePodControl - manager.syncHandler(getKey(ds, t)) - validateSyncDaemonSets(t, fakePodControl, 0, 0) -}