From 28d9260c0b7a26053ec57da74d1ede7f69099725 Mon Sep 17 00:00:00 2001 From: Prashanth Balasubramanian Date: Fri, 13 Feb 2015 11:57:27 -0800 Subject: [PATCH] Sync replication count with the api server on pod creation/deletion. --- cmd/integration/integration.go | 5 +- pkg/controller/replication_controller.go | 10 +- pkg/controller/replication_controller_test.go | 215 +++++++++++------- pkg/registry/controller/rest.go | 16 -- pkg/registry/controller/rest_test.go | 32 --- 5 files changed, 149 insertions(+), 129 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 5c504980fa0..cbb03a479cc 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -203,9 +203,8 @@ func startComponents(manifestURL string) (apiServerURL string) { controllerManager := replicationControllerPkg.NewReplicationManager(cl) - // Prove that controllerManager's watch works by making it not sync until after this - // test is over. (Hopefully we don't take 10 minutes!) - controllerManager.Run(10 * time.Minute) + // TODO: Write an integration test for the replication controllers watch. + controllerManager.Run(1 * time.Second) nodeResources := &api.NodeResources{} diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index 190292e2abe..51aff5d878d 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -196,7 +196,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati return err } filteredList := FilterActivePods(podList.Items) - diff := len(filteredList) - controller.Spec.Replicas + activePods := len(filteredList) + diff := activePods - controller.Spec.Replicas if diff < 0 { diff *= -1 wait := sync.WaitGroup{} @@ -221,6 +222,13 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati } wait.Wait() } + if controller.Status.Replicas != activePods { + controller.Status.Replicas = activePods + _, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller) + if err != nil { + return err + } + } return nil } diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index bf9b772bb7d..44fe3e1924a 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -68,6 +68,8 @@ func (f *FakePodControl) deletePod(namespace string, podName string) error { func newReplicationController(replicas int) api.ReplicationController { return api.ReplicationController{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, + ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "default", ResourceVersion: "18"}, Spec: api.ReplicationControllerSpec{ Replicas: replicas, Template: &api.PodTemplateSpec{ @@ -81,8 +83,14 @@ func newReplicationController(replicas int) api.ReplicationController { Containers: []api.Container{ { Image: "foo/bar", + TerminationMessagePath: api.TerminationMessagePathDefault, + ImagePullPolicy: api.PullIfNotPresent, }, }, + RestartPolicy: api.RestartPolicy{ + Always: &api.RestartPolicyAlways{}, + }, + DNSPolicy: api.DNSDefault, NodeSelector: map[string]string{ "baz": "blah", }, @@ -159,23 +167,37 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { body := runtime.EncodeOrDie(testapi.Codec(), newPodList(0)) - fakeHandler := util.FakeHandler{ + fakePodHandler := util.FakeHandler{ StatusCode: 200, ResponseBody: string(body), } - testServer := httptest.NewServer(&fakeHandler) + fakePodControl := FakePodControl{} + + controller := newReplicationController(2) + fakeUpdateHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &controller), + T: t, + } + + testServerMux := http.NewServeMux() + testServerMux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler) + testServerMux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", controller.Name), &fakeUpdateHandler) + testServer := httptest.NewServer(testServerMux) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - fakePodControl := FakePodControl{} - manager := NewReplicationManager(client) manager.podControl = &fakePodControl - - controllerSpec := newReplicationController(2) - - manager.syncReplicationController(controllerSpec) + manager.syncReplicationController(controller) validateSyncReplication(t, &fakePodControl, 2, 0) + + // No Status.Replicas update expected even though 2 pods were just created, + // because the controller manager can't observe the pods till the next sync cycle. + if fakeUpdateHandler.RequestReceived != nil { + t.Errorf("Unexpected updates for controller via %v", + fakeUpdateHandler.RequestReceived.URL) + } } func TestCreateReplica(t *testing.T) { @@ -193,33 +215,7 @@ func TestCreateReplica(t *testing.T) { kubeClient: client, } - controllerSpec := api.ReplicationController{ - ObjectMeta: api.ObjectMeta{ - Name: "test", - }, - Spec: api.ReplicationControllerSpec{ - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "name": "foo", - "type": "production", - "replicationController": "test", - }, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo/bar", - }, - }, - NodeSelector: map[string]string{ - "foo": "bar", - }, - }, - }, - }, - } - + controllerSpec := newReplicationController(1) podControl.createReplica(ns, controllerSpec) manifest := api.ContainerManifest{} @@ -245,48 +241,13 @@ func TestCreateReplica(t *testing.T) { } } -func TestSynchonize(t *testing.T) { - controllerSpec1 := api.ReplicationController{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - Spec: api.ReplicationControllerSpec{ - Replicas: 4, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "name": "foo", - "type": "production", - }, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "foo/bar", - }, - }, - }, - }, - }, - } - controllerSpec2 := api.ReplicationController{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - Spec: api.ReplicationControllerSpec{ - Replicas: 3, - Template: &api.PodTemplateSpec{ - ObjectMeta: api.ObjectMeta{ - Labels: map[string]string{ - "name": "bar", - "type": "production", - }, - }, - Spec: api.PodSpec{ - Containers: []api.Container{ - { - Image: "bar/baz", - }, - }, - }, - }, - }, +func TestSynchronize(t *testing.T) { + controllerSpec1 := newReplicationController(4) + controllerSpec2 := newReplicationController(3) + controllerSpec2.Name = "bar" + controllerSpec2.Spec.Template.ObjectMeta.Labels = map[string]string{ + "name": "bar", + "type": "production", } fakeEtcd := tools.NewFakeEtcdClient(t) @@ -339,6 +300,106 @@ func TestSynchonize(t *testing.T) { validateSyncReplication(t, &fakePodControl, 7, 0) } +func TestControllerNoReplicaUpdate(t *testing.T) { + // Steady state for the replication controller, no Status.Replicas updates expected + rc := newReplicationController(5) + rc.Status = api.ReplicationControllerStatus{Replicas: 5} + activePods := 5 + + body, _ := latest.Codec.Encode(newPodList(activePods)) + fakePodHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + T: t, + } + fakeControllerHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{ + Items: []api.ReplicationController{rc}, + }), + T: t, + } + fakeUpdateHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc), + T: t, + } + + mux := http.NewServeMux() + mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler) + mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler) + mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler) + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + t.Errorf("Unexpected request for %v", req.RequestURI) + }) + testServer := httptest.NewServer(mux) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + manager := NewReplicationManager(client) + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + + manager.synchronize() + + validateSyncReplication(t, &fakePodControl, 0, 0) + if fakeUpdateHandler.RequestReceived != nil { + t.Errorf("Unexpected updates for controller via %v", + fakeUpdateHandler.RequestReceived.URL) + } +} + +func TestControllerUpdateReplicas(t *testing.T) { + // Insufficient number of pods in the system, and Status.Replicas is wrong; + // Status.Replica should update to match number of pods in system, 1 new pod should be created. + rc := newReplicationController(5) + rc.Status = api.ReplicationControllerStatus{Replicas: 2} + activePods := 4 + + body, _ := latest.Codec.Encode(newPodList(activePods)) + fakePodHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(body), + T: t, + } + fakeControllerHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{ + Items: []api.ReplicationController{rc}, + }), + T: t, + } + fakeUpdateHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc), + T: t, + } + + mux := http.NewServeMux() + + mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler) + mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler) + mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler) + mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusNotFound) + t.Errorf("Unexpected request for %v", req.RequestURI) + }) + testServer := httptest.NewServer(mux) + defer testServer.Close() + client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) + manager := NewReplicationManager(client) + fakePodControl := FakePodControl{} + manager.podControl = &fakePodControl + + manager.synchronize() + + // Status.Replicas should go up from 2->4 even though we created 5-4=1 pod + rc.Status = api.ReplicationControllerStatus{Replicas: 4} + decRc := runtime.EncodeOrDie(testapi.Codec(), &rc) + fakeUpdateHandler.ValidateRequest(t, fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s?namespace=%s", rc.Name, rc.Namespace), "PUT", &decRc) + validateSyncReplication(t, &fakePodControl, 1, 0) +} + type FakeWatcher struct { w *watch.FakeWatcher *client.Fake diff --git a/pkg/registry/controller/rest.go b/pkg/registry/controller/rest.go index 9984699082f..ec986fda76f 100644 --- a/pkg/registry/controller/rest.go +++ b/pkg/registry/controller/rest.go @@ -23,7 +23,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation" - rc "github.com/GoogleCloudPlatform/kubernetes/pkg/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" @@ -78,7 +77,6 @@ func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { if err != nil { return nil, err } - rs.fillCurrentState(ctx, controller) return controller, err } @@ -94,7 +92,6 @@ func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Select filtered := []api.ReplicationController{} for _, controller := range controllers.Items { if label.Matches(labels.Set(controller.Labels)) { - rs.fillCurrentState(ctx, &controller) filtered = append(filtered, controller) } } @@ -133,16 +130,3 @@ func (rs *REST) Update(ctx api.Context, obj runtime.Object) (runtime.Object, boo func (rs *REST) Watch(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { return rs.registry.WatchControllers(ctx, label, field, resourceVersion) } - -// TODO #2726: The controller should populate the current state, not the apiserver -func (rs *REST) fillCurrentState(ctx api.Context, controller *api.ReplicationController) error { - if rs.podLister == nil { - return nil - } - list, err := rs.podLister.ListPods(ctx, labels.Set(controller.Spec.Selector).AsSelector()) - if err != nil { - return err - } - controller.Status.Replicas = len(rc.FilterActivePods(list.Items)) - return nil -} diff --git a/pkg/registry/controller/rest_test.go b/pkg/registry/controller/rest_test.go index 2de7bbfbad4..48c672379b8 100644 --- a/pkg/registry/controller/rest_test.go +++ b/pkg/registry/controller/rest_test.go @@ -20,7 +20,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "reflect" "strings" "testing" @@ -354,37 +353,6 @@ func (f *fakePodLister) ListPods(ctx api.Context, s labels.Selector) (*api.PodLi return &f.l, f.e } -func TestFillCurrentState(t *testing.T) { - fakeLister := fakePodLister{ - l: api.PodList{ - Items: []api.Pod{ - {ObjectMeta: api.ObjectMeta{Name: "foo"}}, - {ObjectMeta: api.ObjectMeta{Name: "bar"}}, - }, - }, - } - mockRegistry := registrytest.ControllerRegistry{} - storage := REST{ - registry: &mockRegistry, - podLister: &fakeLister, - } - controller := api.ReplicationController{ - Spec: api.ReplicationControllerSpec{ - Selector: map[string]string{ - "foo": "bar", - }, - }, - } - ctx := api.NewContext() - storage.fillCurrentState(ctx, &controller) - if controller.Status.Replicas != 2 { - t.Errorf("expected 2, got: %d", controller.Status.Replicas) - } - if !reflect.DeepEqual(fakeLister.s, labels.Set(controller.Spec.Selector).AsSelector()) { - t.Errorf("unexpected output: %#v %#v", labels.Set(controller.Spec.Selector).AsSelector(), fakeLister.s) - } -} - // TODO: remove, covered by TestCreate func TestCreateControllerWithGeneratedName(t *testing.T) { storage := NewREST(®istrytest.ControllerRegistry{}, nil)