Remove dead pods upon stopping a job

This commit is contained in:
Maciej Szulik 2015-10-29 11:07:00 +01:00
parent 720dc87967
commit e662b34ccd
2 changed files with 78 additions and 8 deletions

View File

@ -23,10 +23,12 @@ import (
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/meta" "k8s.io/kubernetes/pkg/api/meta"
"k8s.io/kubernetes/pkg/apis/extensions"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
) )
@ -221,6 +223,7 @@ func (reaper *DaemonSetReaper) Stop(namespace, name string, timeout time.Duratio
func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) (string, error) { func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gracePeriod *api.DeleteOptions) (string, error) {
jobs := reaper.Extensions().Jobs(namespace) jobs := reaper.Extensions().Jobs(namespace)
pods := reaper.Pods(namespace)
scaler, err := ScalerFor("Job", *reaper) scaler, err := ScalerFor("Job", *reaper)
if err != nil { if err != nil {
return "", err return "", err
@ -241,6 +244,22 @@ func (reaper *JobReaper) Stop(namespace, name string, timeout time.Duration, gra
if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil { if err = scaler.Scale(namespace, name, 0, nil, retry, waitForJobs); err != nil {
return "", err return "", err
} }
// at this point only dead pods are left, that should be removed
selector, _ := extensions.PodSelectorAsSelector(job.Spec.Selector)
podList, err := pods.List(selector, fields.Everything())
if err != nil {
return "", err
}
errList := []error{}
for _, pod := range podList.Items {
if err := pods.Delete(pod.Name, gracePeriod); err != nil {
errList = append(errList, err)
}
}
if len(errList) > 0 {
return "", utilerrors.NewAggregate(errList)
}
// once we have all the pods removed we can safely remove the job itself
if err := jobs.Delete(name, gracePeriod); err != nil { if err := jobs.Delete(name, gracePeriod); err != nil {
return "", err return "", err
} }

View File

@ -19,6 +19,7 @@ package kubectl
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"strings"
"testing" "testing"
"time" "time"
@ -317,7 +318,56 @@ func TestJobStop(t *testing.T) {
}, },
StopError: nil, StopError: nil,
StopMessage: "foo stopped", StopMessage: "foo stopped",
ExpectedActions: []string{"get", "get", "update", "get", "get", "delete"}, ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs",
"get:jobs", "get:jobs", "list:pods", "delete:jobs"},
},
{
Name: "JobWithDeadPods",
Objs: []runtime.Object{
&extensions.Job{ // GET
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.JobSpec{
Parallelism: &zero,
Selector: &extensions.PodSelector{
MatchLabels: map[string]string{"k1": "v1"},
},
},
},
&extensions.JobList{ // LIST
Items: []extensions.Job{
{
ObjectMeta: api.ObjectMeta{
Name: name,
Namespace: ns,
},
Spec: extensions.JobSpec{
Parallelism: &zero,
Selector: &extensions.PodSelector{
MatchLabels: map[string]string{"k1": "v1"},
},
},
},
},
},
&api.PodList{ // LIST
Items: []api.Pod{
{
ObjectMeta: api.ObjectMeta{
Name: "pod1",
Namespace: ns,
Labels: map[string]string{"k1": "v1"},
},
},
},
},
},
StopError: nil,
StopMessage: "foo stopped",
ExpectedActions: []string{"get:jobs", "get:jobs", "update:jobs",
"get:jobs", "get:jobs", "list:pods", "delete:pods", "delete:jobs"},
}, },
} }
@ -339,12 +389,13 @@ func TestJobStop(t *testing.T) {
t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions)) t.Errorf("%s unexpected actions: %v, expected %d actions got %d", test.Name, actions, len(test.ExpectedActions), len(actions))
continue continue
} }
for i, verb := range test.ExpectedActions { for i, expAction := range test.ExpectedActions {
if actions[i].GetResource() != "jobs" { action := strings.Split(expAction, ":")
t.Errorf("%s unexpected action: %+v, expected %s-job", test.Name, actions[i], verb) if actions[i].GetVerb() != action[0] {
t.Errorf("%s unexpected verb: %+v, expected %s", test.Name, actions[i], expAction)
} }
if actions[i].GetVerb() != verb { if actions[i].GetResource() != action[1] {
t.Errorf("%s unexpected action: %+v, expected %s-job", test.Name, actions[i], verb) t.Errorf("%s unexpected resource: %+v, expected %s", test.Name, actions[i], expAction)
} }
} }
} }