From 4fdd5bc3f3c7d25458258829bb0cb86cf222921d Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Wed, 6 May 2015 14:39:14 -0700 Subject: [PATCH] Ratelimit replica creation --- cmd/integration/integration.go | 2 +- .../app/controllermanager.go | 2 +- cmd/kubernetes/kubernetes.go | 2 +- .../nodecontroller/nodecontroller.go | 2 +- pkg/controller/controller_utils.go | 2 +- pkg/controller/replication_controller.go | 35 +++-- pkg/controller/replication_controller_test.go | 123 ++++++++++++++++-- 7 files changed, 142 insertions(+), 26 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 4b67b3688d9..41de489aef5 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -214,7 +214,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st // ensure the service endpoints are sync'd several times within the window that the integration tests wait go endpoints.Run(3, util.NeverStop) - controllerManager := replicationControllerPkg.NewReplicationManager(cl) + controllerManager := replicationControllerPkg.NewReplicationManager(cl, replicationControllerPkg.BurstReplicas) // TODO: Write an integration test for the replication controllers watch. go controllerManager.Run(3, util.NeverStop) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index ac8fcef9b48..e7964f7b5b5 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -213,7 +213,7 @@ func (s *CMServer) Run(_ []string) error { endpoints := service.NewEndpointController(kubeClient) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) - controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) + controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas) go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) cloud := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 87c7cdf4ebf..6c5fb874f14 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -143,7 +143,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, endpoints := service.NewEndpointController(cl) go endpoints.Run(5, util.NeverStop) - controllerManager := controller.NewReplicationManager(cl) + controllerManager := controller.NewReplicationManager(cl, controller.BurstReplicas) go controllerManager.Run(5, util.NeverStop) } diff --git a/pkg/cloudprovider/nodecontroller/nodecontroller.go b/pkg/cloudprovider/nodecontroller/nodecontroller.go index 81cc159f7a2..b58bda2cd7d 100644 --- a/pkg/cloudprovider/nodecontroller/nodecontroller.go +++ b/pkg/cloudprovider/nodecontroller/nodecontroller.go @@ -524,7 +524,7 @@ func (nc *NodeController) tryUpdateNodeStatus(node *api.Node) (time.Duration, ap // NodeReady condition was last set longer ago than gracePeriod, so update it to Unknown // (regardless of its current value) in the master, without contacting kubelet. if readyCondition == nil { - glog.V(2).Infof("node %v is never updated by kubelet") + glog.V(2).Infof("node %v is never updated by kubelet", node.Name) node.Status.Conditions = append(node.Status.Conditions, api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionUnknown, diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 0c93d05ec54..0231bc0735b 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -84,7 +84,7 @@ func (r *RCExpectations) SatisfiedExpectations(rc *api.ReplicationController) bo if podExp.Fulfilled() { return true } else { - glog.V(4).Infof("Controller %v still waiting on expectations %#v", podExp) + glog.V(4).Infof("Controller still waiting on expectations %#v", podExp) return false } } else if err != nil { diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 1efb02ca41d..b23cd4882a8 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -58,12 +58,15 @@ const ( // of expectations, without it the RC could stay asleep forever. This should // be set based on the expected latency of watch events. // - // TODO: Set this per expectation, based on its size. // Currently an rc can service (create *and* observe the watch events for said - // creation) about 10-20 pods a second, so it takes about 3.5 min to service - // 3000 pods. Just creation is limited to 30qps, and watching happens with - // ~10-30s latency/pod at scale. - ExpectationsTimeout = 6 * time.Minute + // creation) about 10-20 pods a second, so it takes about 1 min to service + // 500 pods. Just creation is limited to 20qps, and watching happens with ~10-30s + // latency/pod at the scale of 3000 pods over 100 nodes. + ExpectationsTimeout = 3 * time.Minute + + // Realistic value of the burstReplica field for the replication manager based off + // performance requirements for kubernetes 1.0. + BurstReplicas = 500 ) // ReplicationManager is responsible for synchronizing ReplicationController objects stored @@ -72,6 +75,9 @@ type ReplicationManager struct { kubeClient client.Interface podControl PodControlInterface + // An rc is temporarily suspended after creating/deleting these many replicas. + // It resumes normal action after observing the watch events for them. + burstReplicas int // To allow injection of syncReplicationController for testing. syncHandler func(rcKey string) error // A TTLCache of pod creates/deletes each rc expects to see @@ -89,7 +95,7 @@ type ReplicationManager struct { } // NewReplicationManager creates a new ReplicationManager. -func NewReplicationManager(kubeClient client.Interface) *ReplicationManager { +func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -99,8 +105,9 @@ func NewReplicationManager(kubeClient client.Interface) *ReplicationManager { kubeClient: kubeClient, recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "replication-controller"}), }, - expectations: NewRCExpectations(), - queue: workqueue.New(), + burstReplicas: burstReplicas, + expectations: NewRCExpectations(), + queue: workqueue.New(), } rm.controllerStore.Store, rm.rcController = framework.NewInformer( @@ -277,15 +284,19 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller diff := len(filteredPods) - controller.Spec.Replicas if diff < 0 { diff *= -1 + if diff > rm.burstReplicas { + diff = rm.burstReplicas + } rm.expectations.ExpectCreations(controller, diff) wait := sync.WaitGroup{} wait.Add(diff) - glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff) + glog.V(2).Infof("Too few %q/%q replicas, need %d, creating %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff) for i := 0; i < diff; i++ { go func() { defer wait.Done() if err := rm.podControl.createReplica(controller.Namespace, controller); err != nil { // Decrement the expected number of creates because the informer won't observe this pod + glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", controller.Namespace, controller.Name) rm.expectations.CreationObserved(controller) util.HandleError(err) } @@ -293,8 +304,11 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller } wait.Wait() } else if diff > 0 { + if diff > rm.burstReplicas { + diff = rm.burstReplicas + } rm.expectations.ExpectDeletions(controller, diff) - glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff) + glog.V(2).Infof("Too many %q/%q replicas, need %d, deleting %d", controller.Namespace, controller.Name, controller.Spec.Replicas, diff) // Sort the pods in the order such that not-ready < ready, unscheduled // < scheduled, and pending < running. This ensures that we delete pods // in the earlier stages whenever possible. @@ -307,6 +321,7 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*api.Pod, controller defer wait.Done() if err := rm.podControl.deletePod(controller.Namespace, filteredPods[ix].Name); err != nil { // Decrement the expected number of deletes because the informer won't observe this deletion + glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", controller.Namespace, controller.Name) rm.expectations.DeletionObserved(controller) } }(i) diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index e6a83d6241f..a401f2295fb 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -225,7 +225,7 @@ func startManagerAndWait(manager *ReplicationManager, pods int, t *testing.T) ch func TestSyncReplicationControllerDoesNothing(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) // 2 running pods, a controller with 2 replicas, sync is a no-op controllerSpec := newReplicationController(2) @@ -240,7 +240,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) fakePodControl := FakePodControl{} - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) manager.podControl = &fakePodControl // 2 running pods and a controller with 1 replica, one pod delete expected @@ -254,7 +254,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) // A controller with 2 replicas and no pods in the store, 2 creates expected controller := newReplicationController(2) @@ -319,7 +319,7 @@ func TestControllerNoReplicaUpdate(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) // Steady state for the replication controller, no Status.Replicas updates expected activePods := 5 @@ -348,7 +348,7 @@ func TestControllerUpdateReplicas(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) // Insufficient number of pods in the system, and Status.Replicas is wrong; // Status.Replica should update to match number of pods in system, 1 new pod should be created. @@ -533,7 +533,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) fakePodControl := FakePodControl{} - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) manager.podControl = &fakePodControl controllerSpec := newReplicationController(2) @@ -572,7 +572,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})) + manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}), BurstReplicas) testCases := []struct { inRCs []*api.ReplicationController pod *api.Pod @@ -638,7 +638,7 @@ type FakeWatcher struct { func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{Watch: fakeWatch} - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) var testControllerSpec api.ReplicationController received := make(chan string) @@ -679,7 +679,7 @@ func TestWatchControllers(t *testing.T) { func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{Watch: fakeWatch} - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) // Put one rc and one pod into the controller's stores testControllerSpec := newReplicationController(1) @@ -722,7 +722,7 @@ func TestWatchPods(t *testing.T) { func TestUpdatePods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{Watch: fakeWatch} - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) received := make(chan string) @@ -780,7 +780,7 @@ func TestControllerUpdateRequeue(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - manager := NewReplicationManager(client) + manager := NewReplicationManager(client, BurstReplicas) rc := newReplicationController(1) manager.controllerStore.Store.Add(rc) @@ -852,3 +852,104 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { t.Errorf("Expected 1 get and 2 updates, got %d gets %d updates", gets, updates) } } + +func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { + client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()}) + fakePodControl := FakePodControl{} + manager := NewReplicationManager(client, burstReplicas) + manager.podControl = &fakePodControl + + controllerSpec := newReplicationController(numReplicas) + manager.controllerStore.Store.Add(controllerSpec) + + expectedPods := 0 + pods := newPodList(nil, numReplicas, api.PodPending, controllerSpec) + + // Size up the controller, then size it down, and confirm the expected create/delete pattern + for _, replicas := range []int{numReplicas, 0} { + + controllerSpec.Spec.Replicas = replicas + manager.controllerStore.Store.Add(controllerSpec) + + for i := 0; i < numReplicas; i += burstReplicas { + manager.syncReplicationController(getKey(controllerSpec, t)) + + // The store accrues active pods. It's also used by the rc to determine how many + // replicas to create. + activePods := len(manager.podStore.Store.List()) + if replicas != 0 { + // This is the number of pods currently "in flight". They were created by the rc manager above, + // which then puts the rc to sleep till all of them have been observed. + expectedPods = replicas - activePods + if expectedPods > burstReplicas { + expectedPods = burstReplicas + } + // This validates the rc manager sync actually created pods + validateSyncReplication(t, &fakePodControl, expectedPods, 0) + + // This simulates the watch events for all but 1 of the expected pods. + // None of these should wake the controller because it has expectations==BurstReplicas. + for _, pod := range pods.Items[:expectedPods-1] { + manager.podStore.Store.Add(&pod) + manager.addPod(&pod) + } + + podExp, exists, err := manager.expectations.GetExpectations(controllerSpec) + if !exists || err != nil { + t.Fatalf("Did not find expectations for rc.") + } + if add, _ := podExp.getExpectations(); add != 1 { + t.Fatalf("Expectations are wrong %v", podExp) + } + } else { + expectedPods = (replicas - activePods) * -1 + if expectedPods > burstReplicas { + expectedPods = burstReplicas + } + validateSyncReplication(t, &fakePodControl, 0, expectedPods) + for _, pod := range pods.Items[:expectedPods-1] { + manager.podStore.Store.Delete(&pod) + manager.deletePod(&pod) + } + podExp, exists, err := manager.expectations.GetExpectations(controllerSpec) + if !exists || err != nil { + t.Fatalf("Did not find expectations for rc.") + } + if _, del := podExp.getExpectations(); del != 1 { + t.Fatalf("Expectations are wrong %v", podExp) + } + } + + // Check that the rc didn't take any action for all the above pods + fakePodControl.clear() + manager.syncReplicationController(getKey(controllerSpec, t)) + validateSyncReplication(t, &fakePodControl, 0, 0) + + // Create/Delete the last pod + // The last add pod will decrease the expectation of the rc to 0, + // which will cause it to create/delete the remaining replicas upto burstReplicas. + if replicas != 0 { + manager.podStore.Store.Add(&pods.Items[expectedPods-1]) + manager.addPod(&pods.Items[expectedPods-1]) + } else { + manager.podStore.Store.Delete(&pods.Items[expectedPods-1]) + manager.deletePod(&pods.Items[expectedPods-1]) + } + pods.Items = pods.Items[expectedPods:] + } + + // Confirm that we've created the right number of replicas + activePods := len(manager.podStore.Store.List()) + if activePods != controllerSpec.Spec.Replicas { + t.Fatalf("Unexpected number of active pods, expected %d, got %d", controllerSpec.Spec.Replicas, activePods) + } + // Replenish the pod list, since we cut it down sizing up + pods = newPodList(nil, replicas, api.PodRunning, controllerSpec) + } +} + +func TestControllerBurstReplicas(t *testing.T) { + doTestControllerBurstReplicas(t, 5, 30) + doTestControllerBurstReplicas(t, 5, 12) + doTestControllerBurstReplicas(t, 3, 2) +}