Sync replication count with the api server on pod creation/deletion.

This commit is contained in:
Prashanth Balasubramanian
2015-02-13 11:57:27 -08:00
parent 1231e65829
commit 28d9260c0b
5 changed files with 149 additions and 129 deletions

View File

@@ -196,7 +196,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
return err
}
filteredList := FilterActivePods(podList.Items)
diff := len(filteredList) - controller.Spec.Replicas
activePods := len(filteredList)
diff := activePods - controller.Spec.Replicas
if diff < 0 {
diff *= -1
wait := sync.WaitGroup{}
@@ -221,6 +222,13 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
}
wait.Wait()
}
if controller.Status.Replicas != activePods {
controller.Status.Replicas = activePods
_, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller)
if err != nil {
return err
}
}
return nil
}

View File

@@ -68,6 +68,8 @@ func (f *FakePodControl) deletePod(namespace string, podName string) error {
func newReplicationController(replicas int) api.ReplicationController {
return api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{Name: "foobar", Namespace: "default", ResourceVersion: "18"},
Spec: api.ReplicationControllerSpec{
Replicas: replicas,
Template: &api.PodTemplateSpec{
@@ -81,8 +83,14 @@ func newReplicationController(replicas int) api.ReplicationController {
Containers: []api.Container{
{
Image: "foo/bar",
TerminationMessagePath: api.TerminationMessagePathDefault,
ImagePullPolicy: api.PullIfNotPresent,
},
},
RestartPolicy: api.RestartPolicy{
Always: &api.RestartPolicyAlways{},
},
DNSPolicy: api.DNSDefault,
NodeSelector: map[string]string{
"baz": "blah",
},
@@ -159,23 +167,37 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
func TestSyncReplicationControllerCreates(t *testing.T) {
body := runtime.EncodeOrDie(testapi.Codec(), newPodList(0))
fakeHandler := util.FakeHandler{
fakePodHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
}
testServer := httptest.NewServer(&fakeHandler)
fakePodControl := FakePodControl{}
controller := newReplicationController(2)
fakeUpdateHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &controller),
T: t,
}
testServerMux := http.NewServeMux()
testServerMux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
testServerMux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", controller.Name), &fakeUpdateHandler)
testServer := httptest.NewServer(testServerMux)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
fakePodControl := FakePodControl{}
manager := NewReplicationManager(client)
manager.podControl = &fakePodControl
controllerSpec := newReplicationController(2)
manager.syncReplicationController(controllerSpec)
manager.syncReplicationController(controller)
validateSyncReplication(t, &fakePodControl, 2, 0)
// No Status.Replicas update expected even though 2 pods were just created,
// because the controller manager can't observe the pods till the next sync cycle.
if fakeUpdateHandler.RequestReceived != nil {
t.Errorf("Unexpected updates for controller via %v",
fakeUpdateHandler.RequestReceived.URL)
}
}
func TestCreateReplica(t *testing.T) {
@@ -193,33 +215,7 @@ func TestCreateReplica(t *testing.T) {
kubeClient: client,
}
controllerSpec := api.ReplicationController{
ObjectMeta: api.ObjectMeta{
Name: "test",
},
Spec: api.ReplicationControllerSpec{
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "foo",
"type": "production",
"replicationController": "test",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
NodeSelector: map[string]string{
"foo": "bar",
},
},
},
},
}
controllerSpec := newReplicationController(1)
podControl.createReplica(ns, controllerSpec)
manifest := api.ContainerManifest{}
@@ -245,48 +241,13 @@ func TestCreateReplica(t *testing.T) {
}
}
func TestSynchonize(t *testing.T) {
controllerSpec1 := api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
Spec: api.ReplicationControllerSpec{
Replicas: 4,
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "foo",
"type": "production",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "foo/bar",
},
},
},
},
},
}
controllerSpec2 := api.ReplicationController{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
Spec: api.ReplicationControllerSpec{
Replicas: 3,
Template: &api.PodTemplateSpec{
ObjectMeta: api.ObjectMeta{
Labels: map[string]string{
"name": "bar",
"type": "production",
},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Image: "bar/baz",
},
},
},
},
},
func TestSynchronize(t *testing.T) {
controllerSpec1 := newReplicationController(4)
controllerSpec2 := newReplicationController(3)
controllerSpec2.Name = "bar"
controllerSpec2.Spec.Template.ObjectMeta.Labels = map[string]string{
"name": "bar",
"type": "production",
}
fakeEtcd := tools.NewFakeEtcdClient(t)
@@ -339,6 +300,106 @@ func TestSynchonize(t *testing.T) {
validateSyncReplication(t, &fakePodControl, 7, 0)
}
func TestControllerNoReplicaUpdate(t *testing.T) {
// Steady state for the replication controller, no Status.Replicas updates expected
rc := newReplicationController(5)
rc.Status = api.ReplicationControllerStatus{Replicas: 5}
activePods := 5
body, _ := latest.Codec.Encode(newPodList(activePods))
fakePodHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
T: t,
}
fakeControllerHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{
Items: []api.ReplicationController{rc},
}),
T: t,
}
fakeUpdateHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc),
T: t,
}
mux := http.NewServeMux()
mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler)
mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
t.Errorf("Unexpected request for %v", req.RequestURI)
})
testServer := httptest.NewServer(mux)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.synchronize()
validateSyncReplication(t, &fakePodControl, 0, 0)
if fakeUpdateHandler.RequestReceived != nil {
t.Errorf("Unexpected updates for controller via %v",
fakeUpdateHandler.RequestReceived.URL)
}
}
func TestControllerUpdateReplicas(t *testing.T) {
// Insufficient number of pods in the system, and Status.Replicas is wrong;
// Status.Replica should update to match number of pods in system, 1 new pod should be created.
rc := newReplicationController(5)
rc.Status = api.ReplicationControllerStatus{Replicas: 2}
activePods := 4
body, _ := latest.Codec.Encode(newPodList(activePods))
fakePodHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: string(body),
T: t,
}
fakeControllerHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(latest.Codec, &api.ReplicationControllerList{
Items: []api.ReplicationController{rc},
}),
T: t,
}
fakeUpdateHandler := util.FakeHandler{
StatusCode: 200,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), &rc),
T: t,
}
mux := http.NewServeMux()
mux.Handle("/api/"+testapi.Version()+"/pods/", &fakePodHandler)
mux.Handle("/api/"+testapi.Version()+"/replicationControllers/", &fakeControllerHandler)
mux.Handle(fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s", rc.Name), &fakeUpdateHandler)
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusNotFound)
t.Errorf("Unexpected request for %v", req.RequestURI)
})
testServer := httptest.NewServer(mux)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
manager := NewReplicationManager(client)
fakePodControl := FakePodControl{}
manager.podControl = &fakePodControl
manager.synchronize()
// Status.Replicas should go up from 2->4 even though we created 5-4=1 pod
rc.Status = api.ReplicationControllerStatus{Replicas: 4}
decRc := runtime.EncodeOrDie(testapi.Codec(), &rc)
fakeUpdateHandler.ValidateRequest(t, fmt.Sprintf("/api/"+testapi.Version()+"/replicationControllers/%s?namespace=%s", rc.Name, rc.Namespace), "PUT", &decRc)
validateSyncReplication(t, &fakePodControl, 1, 0)
}
type FakeWatcher struct {
w *watch.FakeWatcher
*client.Fake