diff --git a/pkg/api/resource_helpers.go b/pkg/api/resource_helpers.go index 7dff8b94121..e4f8f7edeb1 100644 --- a/pkg/api/resource_helpers.go +++ b/pkg/api/resource_helpers.go @@ -58,3 +58,13 @@ func GetExistingContainerStatus(statuses []ContainerStatus, name string) Contain } return ContainerStatus{} } + +// IsPodReady retruns true if a pod is ready; false otherwise. +func IsPodReady(pod *Pod) bool { + for _, c := range pod.Status.Conditions { + if c.Type == PodReady && c.Status == ConditionTrue { + return true + } + } + return false +} diff --git a/pkg/controller/replication_controller.go b/pkg/controller/replication_controller.go index b74578261e9..0e43b909911 100644 --- a/pkg/controller/replication_controller.go +++ b/pkg/controller/replication_controller.go @@ -18,6 +18,7 @@ package controller import ( "fmt" + "sort" "sync" "time" @@ -163,7 +164,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) { if !ok { if status, ok := event.Object.(*api.Status); ok { if status.Status == api.StatusFailure { - glog.Errorf("failed to watch: %v", status) + glog.Errorf("Failed to watch: %v", status) // Clear resource version here, as above, this won't hurt consistency, but we // should consider introspecting more carefully here. (or make the apiserver smarter) // "why not both?" @@ -178,7 +179,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) { *resourceVersion = rc.ResourceVersion // Sync even if this is a deletion event, to ensure that we leave // it in the desired state. - glog.V(4).Infof("About to sync from watch: %v", rc.Name) + glog.V(4).Infof("About to sync from watch: %q", rc.Name) if err := rm.syncHandler(*rc); err != nil { util.HandleError(fmt.Errorf("unexpected sync error: %v", err)) } @@ -186,33 +187,54 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) { } } -// Helper function. Also used in pkg/registry/controller, for now. -func FilterActivePods(pods []api.Pod) []*api.Pod { +// filterActivePods returns pods that have not terminated. +func filterActivePods(pods []api.Pod) []*api.Pod { var result []*api.Pod - for i := range pods { - value := &pods[i] + for _, value := range pods { if api.PodSucceeded != value.Status.Phase && api.PodFailed != value.Status.Phase { - result = append(result, value) + result = append(result, &value) } } return result } +type activePods []*api.Pod + +func (s activePods) Len() int { return len(s) } +func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s activePods) Less(i, j int) bool { + // Unassigned < assigned + if s[i].Spec.Host == "" && s[j].Spec.Host != "" { + return true + } + // PodPending < PodUnknown < PodRunning + m := map[api.PodPhase]int{api.PodPending: 0, api.PodUnknown: 1, api.PodRunning: 2} + if m[s[i].Status.Phase] != m[s[j].Status.Phase] { + return m[s[i].Status.Phase] < m[s[j].Status.Phase] + } + // Not ready < ready + if !api.IsPodReady(s[i]) && api.IsPodReady(s[j]) { + return true + } + return false +} + func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error { s := labels.Set(controller.Spec.Selector).AsSelector() podList, err := rm.kubeClient.Pods(controller.Namespace).List(s) if err != nil { return err } - filteredList := FilterActivePods(podList.Items) - activePods := len(filteredList) - diff := activePods - controller.Spec.Replicas + filteredList := filterActivePods(podList.Items) + numActivePods := len(filteredList) + diff := numActivePods - controller.Spec.Replicas if diff < 0 { diff *= -1 wait := sync.WaitGroup{} wait.Add(diff) - glog.V(2).Infof("Too few \"%s\" replicas, creating %d\n", controller.Name, diff) + glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff) for i := 0; i < diff; i++ { go func() { defer wait.Done() @@ -221,7 +243,12 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati } wait.Wait() } else if diff > 0 { - glog.V(2).Infof("Too many \"%s\" replicas, deleting %d\n", controller.Name, diff) + glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff) + // Sort the pods in the order such that not-ready < ready, unscheduled + // < scheduled, and pending < running. This ensures that we delete pods + // in the earlier stages whenever possible. + sort.Sort(activePods(filteredList)) + wait := sync.WaitGroup{} wait.Add(diff) for i := 0; i < diff; i++ { @@ -232,8 +259,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati } wait.Wait() } - if controller.Status.Replicas != activePods { - controller.Status.Replicas = activePods + if controller.Status.Replicas != numActivePods { + controller.Status.Replicas = numActivePods _, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller) if err != nil { return err diff --git a/pkg/controller/replication_controller_test.go b/pkg/controller/replication_controller_test.go index 8adb7094ba5..aac5fc7bcb3 100644 --- a/pkg/controller/replication_controller_test.go +++ b/pkg/controller/replication_controller_test.go @@ -18,8 +18,11 @@ package controller import ( "fmt" + "math/rand" "net/http" "net/http/httptest" + "reflect" + "sort" "sync" "testing" "time" @@ -375,3 +378,52 @@ func TestWatchControllers(t *testing.T) { t.Errorf("Expected 1 call but got 0") } } + +func TestSortingActivePods(t *testing.T) { + numPods := 5 + podList := newPodList(numPods) + pods := make([]*api.Pod, len(podList.Items)) + for i := range podList.Items { + pods[i] = &podList.Items[i] + } + // pods[0] is not scheduled yet. + pods[0].Spec.Host = "" + pods[0].Status.Phase = api.PodPending + // pods[1] is scheduled but pending. + pods[1].Spec.Host = "bar" + pods[1].Status.Phase = api.PodPending + // pods[2] is unknown. + pods[2].Spec.Host = "foo" + pods[2].Status.Phase = api.PodUnknown + // pods[3] is running but not ready. + pods[3].Spec.Host = "foo" + pods[3].Status.Phase = api.PodRunning + // pods[4] is running and ready. + pods[4].Spec.Host = "foo" + pods[4].Status.Phase = api.PodRunning + pods[4].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}} + + getOrder := func(pods []*api.Pod) []string { + names := make([]string, len(pods)) + for i := range pods { + names[i] = pods[i].Name + } + return names + } + + expected := getOrder(pods) + + for i := 0; i < 20; i++ { + idx := rand.Perm(numPods) + randomizedPods := make([]*api.Pod, numPods) + for j := 0; j < numPods; j++ { + randomizedPods[j] = pods[idx[j]] + } + sort.Sort(activePods(randomizedPods)) + actual := getOrder(randomizedPods) + + if !reflect.DeepEqual(actual, expected) { + t.Errorf("expected %v, got %v", expected, actual) + } + } +} diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 1be624dc97f..b5c0f317998 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -315,14 +315,7 @@ func (e *EndpointController) syncService(key string) { continue } - inService := false - for _, c := range pod.Status.Conditions { - if c.Type == api.PodReady && c.Status == api.ConditionTrue { - inService = true - break - } - } - if !inService { + if !api.IsPodReady(pod) { glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) continue }