Prioritize deleting the non-running pods when reducing replicas

This changes instructs the replication controller to delete replicas in the
order of "unscheduled (pending)", "scheduled (pending)", and "scheduled
(running)" pods. This is less disruptive than deleting random pods.
This commit is contained in:
Yu-Ju Hong
2015-04-16 17:37:57 -07:00
parent b944049fe9
commit df2cbd4877
4 changed files with 104 additions and 22 deletions

View File

@@ -18,6 +18,7 @@ package controller
import (
"fmt"
"sort"
"sync"
"time"
@@ -163,7 +164,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
if !ok {
if status, ok := event.Object.(*api.Status); ok {
if status.Status == api.StatusFailure {
glog.Errorf("failed to watch: %v", status)
glog.Errorf("Failed to watch: %v", status)
// Clear resource version here, as above, this won't hurt consistency, but we
// should consider introspecting more carefully here. (or make the apiserver smarter)
// "why not both?"
@@ -178,7 +179,7 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
*resourceVersion = rc.ResourceVersion
// Sync even if this is a deletion event, to ensure that we leave
// it in the desired state.
glog.V(4).Infof("About to sync from watch: %v", rc.Name)
glog.V(4).Infof("About to sync from watch: %q", rc.Name)
if err := rm.syncHandler(*rc); err != nil {
util.HandleError(fmt.Errorf("unexpected sync error: %v", err))
}
@@ -186,33 +187,54 @@ func (rm *ReplicationManager) watchControllers(resourceVersion *string) {
}
}
// Helper function. Also used in pkg/registry/controller, for now.
func FilterActivePods(pods []api.Pod) []*api.Pod {
// filterActivePods returns pods that have not terminated.
func filterActivePods(pods []api.Pod) []*api.Pod {
var result []*api.Pod
for i := range pods {
value := &pods[i]
for _, value := range pods {
if api.PodSucceeded != value.Status.Phase &&
api.PodFailed != value.Status.Phase {
result = append(result, value)
result = append(result, &value)
}
}
return result
}
type activePods []*api.Pod
func (s activePods) Len() int { return len(s) }
func (s activePods) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s activePods) Less(i, j int) bool {
// Unassigned < assigned
if s[i].Spec.Host == "" && s[j].Spec.Host != "" {
return true
}
// PodPending < PodUnknown < PodRunning
m := map[api.PodPhase]int{api.PodPending: 0, api.PodUnknown: 1, api.PodRunning: 2}
if m[s[i].Status.Phase] != m[s[j].Status.Phase] {
return m[s[i].Status.Phase] < m[s[j].Status.Phase]
}
// Not ready < ready
if !api.IsPodReady(s[i]) && api.IsPodReady(s[j]) {
return true
}
return false
}
func (rm *ReplicationManager) syncReplicationController(controller api.ReplicationController) error {
s := labels.Set(controller.Spec.Selector).AsSelector()
podList, err := rm.kubeClient.Pods(controller.Namespace).List(s)
if err != nil {
return err
}
filteredList := FilterActivePods(podList.Items)
activePods := len(filteredList)
diff := activePods - controller.Spec.Replicas
filteredList := filterActivePods(podList.Items)
numActivePods := len(filteredList)
diff := numActivePods - controller.Spec.Replicas
if diff < 0 {
diff *= -1
wait := sync.WaitGroup{}
wait.Add(diff)
glog.V(2).Infof("Too few \"%s\" replicas, creating %d\n", controller.Name, diff)
glog.V(2).Infof("Too few %q replicas, creating %d", controller.Name, diff)
for i := 0; i < diff; i++ {
go func() {
defer wait.Done()
@@ -221,7 +243,12 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
}
wait.Wait()
} else if diff > 0 {
glog.V(2).Infof("Too many \"%s\" replicas, deleting %d\n", controller.Name, diff)
glog.V(2).Infof("Too many %q replicas, deleting %d", controller.Name, diff)
// Sort the pods in the order such that not-ready < ready, unscheduled
// < scheduled, and pending < running. This ensures that we delete pods
// in the earlier stages whenever possible.
sort.Sort(activePods(filteredList))
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
@@ -232,8 +259,8 @@ func (rm *ReplicationManager) syncReplicationController(controller api.Replicati
}
wait.Wait()
}
if controller.Status.Replicas != activePods {
controller.Status.Replicas = activePods
if controller.Status.Replicas != numActivePods {
controller.Status.Replicas = numActivePods
_, err = rm.kubeClient.ReplicationControllers(controller.Namespace).Update(&controller)
if err != nil {
return err

View File

@@ -18,8 +18,11 @@ package controller
import (
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"reflect"
"sort"
"sync"
"testing"
"time"
@@ -375,3 +378,52 @@ func TestWatchControllers(t *testing.T) {
t.Errorf("Expected 1 call but got 0")
}
}
func TestSortingActivePods(t *testing.T) {
numPods := 5
podList := newPodList(numPods)
pods := make([]*api.Pod, len(podList.Items))
for i := range podList.Items {
pods[i] = &podList.Items[i]
}
// pods[0] is not scheduled yet.
pods[0].Spec.Host = ""
pods[0].Status.Phase = api.PodPending
// pods[1] is scheduled but pending.
pods[1].Spec.Host = "bar"
pods[1].Status.Phase = api.PodPending
// pods[2] is unknown.
pods[2].Spec.Host = "foo"
pods[2].Status.Phase = api.PodUnknown
// pods[3] is running but not ready.
pods[3].Spec.Host = "foo"
pods[3].Status.Phase = api.PodRunning
// pods[4] is running and ready.
pods[4].Spec.Host = "foo"
pods[4].Status.Phase = api.PodRunning
pods[4].Status.Conditions = []api.PodCondition{{Type: api.PodReady, Status: api.ConditionTrue}}
getOrder := func(pods []*api.Pod) []string {
names := make([]string, len(pods))
for i := range pods {
names[i] = pods[i].Name
}
return names
}
expected := getOrder(pods)
for i := 0; i < 20; i++ {
idx := rand.Perm(numPods)
randomizedPods := make([]*api.Pod, numPods)
for j := 0; j < numPods; j++ {
randomizedPods[j] = pods[idx[j]]
}
sort.Sort(activePods(randomizedPods))
actual := getOrder(randomizedPods)
if !reflect.DeepEqual(actual, expected) {
t.Errorf("expected %v, got %v", expected, actual)
}
}
}