Merge pull request #98238 from alculquicondor/job-completion

Track Job completion through Pod finalizers and status
This commit is contained in:
Kubernetes Prow Robot 2021-07-09 08:42:54 -07:00 committed by GitHub
commit 03fa68099e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 2548 additions and 714 deletions

View File

@ -124,6 +124,7 @@ type PodControllerRefManager struct {
BaseControllerRefManager BaseControllerRefManager
controllerKind schema.GroupVersionKind controllerKind schema.GroupVersionKind
podControl PodControlInterface podControl PodControlInterface
finalizers []string
} }
// NewPodControllerRefManager returns a PodControllerRefManager that exposes // NewPodControllerRefManager returns a PodControllerRefManager that exposes
@ -143,6 +144,7 @@ func NewPodControllerRefManager(
selector labels.Selector, selector labels.Selector,
controllerKind schema.GroupVersionKind, controllerKind schema.GroupVersionKind,
canAdopt func() error, canAdopt func() error,
finalizers ...string,
) *PodControllerRefManager { ) *PodControllerRefManager {
return &PodControllerRefManager{ return &PodControllerRefManager{
BaseControllerRefManager: BaseControllerRefManager{ BaseControllerRefManager: BaseControllerRefManager{
@ -152,6 +154,7 @@ func NewPodControllerRefManager(
}, },
controllerKind: controllerKind, controllerKind: controllerKind,
podControl: podControl, podControl: podControl,
finalizers: finalizers,
} }
} }
@ -216,7 +219,7 @@ func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
// Note that ValidateOwnerReferences() will reject this patch if another // Note that ValidateOwnerReferences() will reject this patch if another
// OwnerReference exists with controller=true. // OwnerReference exists with controller=true.
patchBytes, err := ownerRefControllerPatch(m.Controller, m.controllerKind, pod.UID) patchBytes, err := ownerRefControllerPatch(m.Controller, m.controllerKind, pod.UID, m.finalizers...)
if err != nil { if err != nil {
return err return err
} }
@ -228,7 +231,7 @@ func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error {
func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error { func (m *PodControllerRefManager) ReleasePod(pod *v1.Pod) error {
klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s", klog.V(2).Infof("patching pod %s_%s to remove its controllerRef to %s/%s:%s",
pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName()) pod.Namespace, pod.Name, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.Controller.GetName())
patchBytes, err := deleteOwnerRefStrategicMergePatch(pod.UID, m.Controller.GetUID()) patchBytes, err := deleteOwnerRefStrategicMergePatch(pod.UID, m.Controller.GetUID(), m.finalizers...)
if err != nil { if err != nil {
return err return err
} }
@ -520,17 +523,20 @@ type objectForDeleteOwnerRefStrategicMergePatch struct {
type objectMetaForMergePatch struct { type objectMetaForMergePatch struct {
UID types.UID `json:"uid"` UID types.UID `json:"uid"`
OwnerReferences []map[string]string `json:"ownerReferences"` OwnerReferences []map[string]string `json:"ownerReferences"`
DeleteFinalizers []string `json:"$deleteFromPrimitiveList/finalizers,omitempty"`
} }
func deleteOwnerRefStrategicMergePatch(dependentUID types.UID, ownerUIDs ...types.UID) ([]byte, error) { func deleteOwnerRefStrategicMergePatch(dependentUID types.UID, ownerUID types.UID, finalizers ...string) ([]byte, error) {
var pieces []map[string]string
for _, ownerUID := range ownerUIDs {
pieces = append(pieces, map[string]string{"$patch": "delete", "uid": string(ownerUID)})
}
patch := objectForDeleteOwnerRefStrategicMergePatch{ patch := objectForDeleteOwnerRefStrategicMergePatch{
Metadata: objectMetaForMergePatch{ Metadata: objectMetaForMergePatch{
UID: dependentUID, UID: dependentUID,
OwnerReferences: pieces, OwnerReferences: []map[string]string{
{
"$patch": "delete",
"uid": string(ownerUID),
},
},
DeleteFinalizers: finalizers,
}, },
} }
patchBytes, err := json.Marshal(&patch) patchBytes, err := json.Marshal(&patch)
@ -547,9 +553,10 @@ type objectForAddOwnerRefPatch struct {
type objectMetaForPatch struct { type objectMetaForPatch struct {
OwnerReferences []metav1.OwnerReference `json:"ownerReferences"` OwnerReferences []metav1.OwnerReference `json:"ownerReferences"`
UID types.UID `json:"uid"` UID types.UID `json:"uid"`
Finalizers []string `json:"finalizers,omitempty"`
} }
func ownerRefControllerPatch(controller metav1.Object, controllerKind schema.GroupVersionKind, uid types.UID) ([]byte, error) { func ownerRefControllerPatch(controller metav1.Object, controllerKind schema.GroupVersionKind, uid types.UID, finalizers ...string) ([]byte, error) {
blockOwnerDeletion := true blockOwnerDeletion := true
isController := true isController := true
addControllerPatch := objectForAddOwnerRefPatch{ addControllerPatch := objectForAddOwnerRefPatch{
@ -565,6 +572,7 @@ func ownerRefControllerPatch(controller metav1.Object, controllerKind schema.Gro
BlockOwnerDeletion: &blockOwnerDeletion, BlockOwnerDeletion: &blockOwnerDeletion,
}, },
}, },
Finalizers: finalizers,
}, },
} }
patchBytes, err := json.Marshal(&addControllerPatch) patchBytes, err := json.Marshal(&addControllerPatch)

View File

@ -17,9 +17,10 @@ limitations under the License.
package controller package controller
import ( import (
"reflect" "strings"
"testing" "testing"
"github.com/google/go-cmp/cmp"
apps "k8s.io/api/apps/v1" apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -63,6 +64,7 @@ func TestClaimPods(t *testing.T) {
manager *PodControllerRefManager manager *PodControllerRefManager
pods []*v1.Pod pods []*v1.Pod
claimed []*v1.Pod claimed []*v1.Pod
patches int
} }
var tests = []test{ var tests = []test{
{ {
@ -74,6 +76,7 @@ func TestClaimPods(t *testing.T) {
func() error { return nil }), func() error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", testLabel, nil)}, pods: []*v1.Pod{newPod("pod1", productionLabel, nil), newPod("pod2", testLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, nil)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, nil)},
patches: 1,
}, },
func() test { func() test {
controller := v1.ReplicationController{} controller := v1.ReplicationController{}
@ -135,6 +138,7 @@ func TestClaimPods(t *testing.T) {
func() error { return nil }), func() error { return nil }),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller)}, pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)}, claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller)},
patches: 1,
} }
}(), }(),
func() test { func() test {
@ -157,22 +161,50 @@ func TestClaimPods(t *testing.T) {
claimed: []*v1.Pod{podToDelete1}, claimed: []*v1.Pod{podToDelete1},
} }
}(), }(),
func() test {
controller := v1.ReplicationController{}
controller.UID = types.UID(controllerUID)
return test{
name: "Controller claims or release pods according to selector with finalizers",
manager: NewPodControllerRefManager(&FakePodControl{},
&controller,
productionLabelSelector,
controllerKind,
func() error { return nil },
"foo-finalizer", "bar-finalizer"),
pods: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod2", testLabel, &controller), newPod("pod3", productionLabel, nil)},
claimed: []*v1.Pod{newPod("pod1", productionLabel, &controller), newPod("pod3", productionLabel, nil)},
patches: 2,
}
}(),
} }
for _, test := range tests { for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
claimed, err := test.manager.ClaimPods(test.pods) claimed, err := test.manager.ClaimPods(test.pods)
if err != nil { if err != nil {
t.Errorf("Test case `%s`, unexpected error: %v", test.name, err) t.Fatalf("Unexpected error: %v", err)
} else if !reflect.DeepEqual(test.claimed, claimed) {
t.Errorf("Test case `%s`, claimed wrong pods. Expected %v, got %v", test.name, podToStringSlice(test.claimed), podToStringSlice(claimed))
} }
if diff := cmp.Diff(test.claimed, claimed); diff != "" {
t.Errorf("Claimed wrong pods (-want,+got):\n%s", diff)
}
fakePodControl, ok := test.manager.podControl.(*FakePodControl)
if !ok {
return
}
if p := len(fakePodControl.Patches); p != test.patches {
t.Errorf("ClaimPods issues %d patches, want %d", p, test.patches)
}
for _, p := range fakePodControl.Patches {
patch := string(p)
if uid := string(test.manager.Controller.GetUID()); !strings.Contains(patch, uid) {
t.Errorf("Patch doesn't contain controller UID %s", uid)
}
for _, f := range test.manager.finalizers {
if !strings.Contains(patch, f) {
t.Errorf("Patch doesn't contain finalizer %q", f)
} }
} }
func podToStringSlice(pods []*v1.Pod) []string {
var names []string
for _, pod := range pods {
names = append(names, pod.Name)
} }
return names })
}
} }

View File

@ -18,7 +18,6 @@ package job
import ( import (
"fmt" "fmt"
"math"
"sort" "sort"
"strconv" "strconv"
"strings" "strings"
@ -27,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/storage/names" "k8s.io/apiserver/pkg/storage/names"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
) )
@ -39,85 +39,184 @@ func isIndexedJob(job *batch.Job) bool {
return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion return job.Spec.CompletionMode != nil && *job.Spec.CompletionMode == batch.IndexedCompletion
} }
// calculateSucceededIndexes returns a string representation of the list of type interval struct {
// succeeded indexes in compressed format and the number of succeeded indexes. First int
func calculateSucceededIndexes(pods []*v1.Pod, completions int32) (string, int32) { Last int
sort.Sort(byCompletionIndex(pods))
var result strings.Builder
var lastSucceeded int
var count int32
firstSucceeded := math.MinInt32
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
if ix == unknownCompletionIndex {
continue
}
if ix >= int(completions) {
break
}
if p.Status.Phase == v1.PodSucceeded {
if firstSucceeded == math.MinInt32 {
firstSucceeded = ix
} else if ix > lastSucceeded+1 {
addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded)
count += int32(lastSucceeded - firstSucceeded + 1)
firstSucceeded = ix
}
lastSucceeded = ix
}
}
if firstSucceeded != math.MinInt32 {
addSingleOrRangeStr(&result, firstSucceeded, lastSucceeded)
count += int32(lastSucceeded - firstSucceeded + 1)
}
return result.String(), count
} }
func addSingleOrRangeStr(builder *strings.Builder, first, last int) { type orderedIntervals []interval
// calculateSucceededIndexes returns the old and new list of succeeded indexes
// in compressed format (intervals).
// The old list is solely based off .status.completedIndexes, but returns an
// empty list if this Job is not tracked with finalizers. The new list includes
// the indexes that succeeded since the last sync.
func calculateSucceededIndexes(job *batch.Job, pods []*v1.Pod) (orderedIntervals, orderedIntervals) {
var prevIntervals orderedIntervals
withFinalizers := trackingUncountedPods(job)
if withFinalizers {
prevIntervals = succeededIndexesFromJob(job)
}
newSucceeded := sets.NewInt()
for _, p := range pods {
ix := getCompletionIndex(p.Annotations)
// Succeeded Pod with valid index and, if tracking with finalizers,
// has a finalizer (meaning that it is not counted yet).
if p.Status.Phase == v1.PodSucceeded && ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && (!withFinalizers || hasJobTrackingFinalizer(p)) {
newSucceeded.Insert(ix)
}
}
// List returns the items of the set in order.
result := prevIntervals.withOrderedIndexes(newSucceeded.List())
return prevIntervals, result
}
// withOrderedIndexes returns a new list of ordered intervals that contains
// the newIndexes, provided in increasing order.
func (oi orderedIntervals) withOrderedIndexes(newIndexes []int) orderedIntervals {
var result orderedIntervals
i := 0
j := 0
var lastInterval *interval
appendOrMergeWithLastInterval := func(thisInterval interval) {
if lastInterval == nil || thisInterval.First > lastInterval.Last+1 {
result = append(result, thisInterval)
lastInterval = &result[len(result)-1]
} else if lastInterval.Last < thisInterval.Last {
lastInterval.Last = thisInterval.Last
}
}
for i < len(oi) && j < len(newIndexes) {
if oi[i].First < newIndexes[j] {
appendOrMergeWithLastInterval(oi[i])
i++
} else {
appendOrMergeWithLastInterval(interval{newIndexes[j], newIndexes[j]})
j++
}
}
for i < len(oi) {
appendOrMergeWithLastInterval(oi[i])
i++
}
for j < len(newIndexes) {
appendOrMergeWithLastInterval(interval{newIndexes[j], newIndexes[j]})
j++
}
return result
}
// total returns number of indexes contained in the intervals.
func (oi orderedIntervals) total() int {
var count int
for _, iv := range oi {
count += iv.Last - iv.First + 1
}
return count
}
func (oi orderedIntervals) String() string {
var builder strings.Builder
for _, v := range oi {
if builder.Len() > 0 { if builder.Len() > 0 {
builder.WriteRune(',') builder.WriteRune(',')
} }
builder.WriteString(strconv.Itoa(first)) builder.WriteString(strconv.Itoa(v.First))
if last > first { if v.Last > v.First {
if last == first+1 { if v.Last == v.First+1 {
builder.WriteRune(',') builder.WriteRune(',')
} else { } else {
builder.WriteRune('-') builder.WriteRune('-')
} }
builder.WriteString(strconv.Itoa(last)) builder.WriteString(strconv.Itoa(v.Last))
} }
} }
return builder.String()
}
func (oi orderedIntervals) has(ix int) bool {
lo := 0
hi := len(oi)
// Invariant: oi[hi].Last >= ix
for hi > lo {
mid := lo + (hi-lo)/2
if oi[mid].Last >= ix {
hi = mid
} else {
lo = mid + 1
}
}
if hi == len(oi) {
return false
}
return oi[hi].First <= ix
}
func succeededIndexesFromJob(job *batch.Job) orderedIntervals {
if job.Status.CompletedIndexes == "" {
return nil
}
var result orderedIntervals
var lastInterval *interval
completions := int(*job.Spec.Completions)
for _, intervalStr := range strings.Split(job.Status.CompletedIndexes, ",") {
limitsStr := strings.Split(intervalStr, "-")
var inter interval
var err error
inter.First, err = strconv.Atoi(limitsStr[0])
if err != nil {
klog.InfoS("Corrupted completed indexes interval, ignoring", "job", klog.KObj(job), "interval", intervalStr, "err", err)
continue
}
if inter.First >= completions {
break
}
if len(limitsStr) > 1 {
inter.Last, err = strconv.Atoi(limitsStr[1])
if err != nil {
klog.InfoS("Corrupted completed indexes interval, ignoring", "job", klog.KObj(job), "interval", intervalStr, "err", err)
continue
}
if inter.Last >= completions {
inter.Last = completions - 1
}
} else {
inter.Last = inter.First
}
if lastInterval != nil && lastInterval.Last == inter.First-1 {
lastInterval.Last = inter.Last
} else {
result = append(result, inter)
lastInterval = &result[len(result)-1]
}
}
return result
}
// firstPendingIndexes returns `count` indexes less than `completions` that are // firstPendingIndexes returns `count` indexes less than `completions` that are
// not covered by running or succeeded pods. // not covered by `activePods` or `succeededIndexes`.
func firstPendingIndexes(pods []*v1.Pod, count, completions int) []int { func firstPendingIndexes(activePods []*v1.Pod, succeededIndexes orderedIntervals, count, completions int) []int {
if count == 0 { if count == 0 {
return nil return nil
} }
nonPending := sets.NewInt() active := sets.NewInt()
for _, p := range pods { for _, p := range activePods {
if p.Status.Phase == v1.PodSucceeded || controller.IsPodActive(p) {
ix := getCompletionIndex(p.Annotations) ix := getCompletionIndex(p.Annotations)
if ix != unknownCompletionIndex { if ix != unknownCompletionIndex {
nonPending.Insert(ix) active.Insert(ix)
}
} }
} }
result := make([]int, 0, count) result := make([]int, 0, count)
// The following algorithm is bounded by the number of non pending pods and nonPending := succeededIndexes.withOrderedIndexes(active.List())
// parallelism. // The following algorithm is bounded by len(nonPending) and count.
// TODO(#99368): Convert the list of non-pending pods into a set of
// non-pending intervals from the job's .status.completedIndexes and active
// pods.
candidate := 0 candidate := 0
for _, np := range nonPending.List() { for _, sInterval := range nonPending {
for ; candidate < np && candidate < completions; candidate++ { for ; candidate < completions && len(result) < count && candidate < sInterval.First; candidate++ {
result = append(result, candidate) result = append(result, candidate)
if len(result) == count {
return result
} }
if candidate < sInterval.Last+1 {
candidate = sInterval.Last + 1
} }
candidate = np + 1
} }
for ; candidate < completions && len(result) < count; candidate++ { for ; candidate < completions && len(result) < count; candidate++ {
result = append(result, candidate) result = append(result, candidate)

View File

@ -22,22 +22,30 @@ import (
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1" batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/util/feature"
featuregatetesting "k8s.io/component-base/featuregate/testing"
"k8s.io/kubernetes/pkg/features"
"k8s.io/utils/pointer"
) )
const noIndex = "-" const noIndex = "-"
func TestCalculateSucceededIndexes(t *testing.T) { func TestCalculateSucceededIndexes(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
prevSucceeded string
pods []indexPhase pods []indexPhase
wantCount int32
completions int32 completions int32
trackingWithFinalizers bool
wantStatusIntervals orderedIntervals
wantIntervals orderedIntervals
}{ }{
"1": { "one index": {
pods: []indexPhase{{"1", v1.PodSucceeded}}, pods: []indexPhase{{"1", v1.PodSucceeded}},
wantCount: 1,
completions: 2, completions: 2,
wantIntervals: []interval{{1, 1}},
}, },
"5,10": { "two separate": {
pods: []indexPhase{ pods: []indexPhase{
{"2", v1.PodFailed}, {"2", v1.PodFailed},
{"5", v1.PodSucceeded}, {"5", v1.PodSucceeded},
@ -45,10 +53,10 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"10", v1.PodFailed}, {"10", v1.PodFailed},
{"10", v1.PodSucceeded}, {"10", v1.PodSucceeded},
}, },
wantCount: 2,
completions: 11, completions: 11,
wantIntervals: []interval{{5, 5}, {10, 10}},
}, },
"2,3,5-7": { "two intervals": {
pods: []indexPhase{ pods: []indexPhase{
{"0", v1.PodRunning}, {"0", v1.PodRunning},
{"1", v1.PodPending}, {"1", v1.PodPending},
@ -58,10 +66,11 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"6", v1.PodSucceeded}, {"6", v1.PodSucceeded},
{"7", v1.PodSucceeded}, {"7", v1.PodSucceeded},
}, },
wantCount: 5,
completions: 8, completions: 8,
wantIntervals: []interval{{2, 3}, {5, 7}},
}, },
"0-2": { "one interval, ignore previous": {
prevSucceeded: "3-5",
pods: []indexPhase{ pods: []indexPhase{
{"0", v1.PodSucceeded}, {"0", v1.PodSucceeded},
{"1", v1.PodFailed}, {"1", v1.PodFailed},
@ -70,10 +79,10 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{"2", v1.PodSucceeded}, {"2", v1.PodSucceeded},
{"3", v1.PodFailed}, {"3", v1.PodFailed},
}, },
wantCount: 3,
completions: 4, completions: 4,
wantIntervals: []interval{{0, 2}},
}, },
"0,2-5": { "one index and one interval": {
pods: []indexPhase{ pods: []indexPhase{
{"0", v1.PodSucceeded}, {"0", v1.PodSucceeded},
{"1", v1.PodFailed}, {"1", v1.PodFailed},
@ -84,10 +93,10 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{noIndex, v1.PodSucceeded}, {noIndex, v1.PodSucceeded},
{"-2", v1.PodSucceeded}, {"-2", v1.PodSucceeded},
}, },
wantCount: 5,
completions: 6, completions: 6,
wantIntervals: []interval{{0, 0}, {2, 5}},
}, },
"0-2,4": { "out of range": {
pods: []indexPhase{ pods: []indexPhase{
{"0", v1.PodSucceeded}, {"0", v1.PodSucceeded},
{"1", v1.PodSucceeded}, {"1", v1.PodSucceeded},
@ -98,19 +107,184 @@ func TestCalculateSucceededIndexes(t *testing.T) {
{noIndex, v1.PodSucceeded}, {noIndex, v1.PodSucceeded},
{"-2", v1.PodSucceeded}, {"-2", v1.PodSucceeded},
}, },
wantCount: 4,
completions: 5, completions: 5,
wantIntervals: []interval{{0, 2}, {4, 4}},
},
"prev interval out of range": {
prevSucceeded: "0-5,8-10",
completions: 8,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{{0, 5}},
wantIntervals: []interval{{0, 5}},
},
"prev interval partially out of range": {
prevSucceeded: "0-5,8-10",
completions: 10,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{{0, 5}, {8, 9}},
wantIntervals: []interval{{0, 5}, {8, 9}},
},
"prev and new separate": {
prevSucceeded: "0,4,5,10-12",
pods: []indexPhase{
{"2", v1.PodSucceeded},
{"7", v1.PodSucceeded},
{"8", v1.PodSucceeded},
},
completions: 13,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{0, 0},
{4, 5},
{10, 12},
},
wantIntervals: []interval{
{0, 0},
{2, 2},
{4, 5},
{7, 8},
{10, 12},
},
},
"prev between new": {
prevSucceeded: "3,4,6",
pods: []indexPhase{
{"2", v1.PodSucceeded},
{"7", v1.PodSucceeded},
{"8", v1.PodSucceeded},
},
completions: 9,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{3, 4},
{6, 6},
},
wantIntervals: []interval{
{2, 4},
{6, 8},
},
},
"new between prev": {
prevSucceeded: "2,7,8",
pods: []indexPhase{
{"3", v1.PodSucceeded},
{"4", v1.PodSucceeded},
{"6", v1.PodSucceeded},
},
completions: 9,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{2, 2},
{7, 8},
},
wantIntervals: []interval{
{2, 4},
{6, 8},
},
},
"new within prev": {
prevSucceeded: "2-7",
pods: []indexPhase{
{"0", v1.PodSucceeded},
{"3", v1.PodSucceeded},
{"5", v1.PodSucceeded},
{"9", v1.PodSucceeded},
},
completions: 10,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{2, 7},
},
wantIntervals: []interval{
{0, 0},
{2, 7},
{9, 9},
},
},
"corrupted interval": {
prevSucceeded: "0,1-foo,bar",
pods: []indexPhase{
{"3", v1.PodSucceeded},
},
completions: 4,
trackingWithFinalizers: true,
wantStatusIntervals: []interval{
{0, 0},
},
wantIntervals: []interval{
{0, 0},
{3, 3},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, tc.trackingWithFinalizers)()
job := &batch.Job{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
batch.JobTrackingFinalizer: "",
},
},
Status: batch.JobStatus{
CompletedIndexes: tc.prevSucceeded,
},
Spec: batch.JobSpec{
Completions: pointer.Int32Ptr(tc.completions),
}, },
} }
for want, tc := range cases {
t.Run(want, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc.pods) pods := hollowPodsWithIndexPhase(tc.pods)
gotStr, gotCnt := calculateSucceededIndexes(pods, tc.completions) for _, p := range pods {
if diff := cmp.Diff(want, gotStr); diff != "" { p.Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
}
gotStatusIntervals, gotIntervals := calculateSucceededIndexes(job, pods)
if diff := cmp.Diff(tc.wantStatusIntervals, gotStatusIntervals); diff != "" {
t.Errorf("Unexpected completed indexes from status (-want,+got):\n%s", diff)
}
if diff := cmp.Diff(tc.wantIntervals, gotIntervals); diff != "" {
t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff) t.Errorf("Unexpected completed indexes (-want,+got):\n%s", diff)
} }
if gotCnt != tc.wantCount { })
t.Errorf("Got number of completed indexes %d, want %d", gotCnt, tc.wantCount) }
}
func TestIntervalsHaveIndex(t *testing.T) {
cases := map[string]struct {
intervals orderedIntervals
index int
wantHas bool
}{
"empty": {
index: 4,
},
"before all": {
index: 1,
intervals: []interval{{2, 4}, {5, 7}},
},
"after all": {
index: 9,
intervals: []interval{{2, 4}, {6, 8}},
},
"in between": {
index: 5,
intervals: []interval{{2, 4}, {6, 8}},
},
"in first": {
index: 2,
intervals: []interval{{2, 4}, {6, 8}},
wantHas: true,
},
"in second": {
index: 8,
intervals: []interval{{2, 4}, {6, 8}},
wantHas: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
has := tc.intervals.has(tc.index)
if has != tc.wantHas {
t.Errorf("intervalsHaveIndex(_, _) = %t, want %t", has, tc.wantHas)
} }
}) })
} }
@ -120,7 +294,8 @@ func TestFirstPendingIndexes(t *testing.T) {
cases := map[string]struct { cases := map[string]struct {
cnt int cnt int
completions int completions int
pods []indexPhase activePods []indexPhase
succeededIndexes []interval
want []int want []int
}{ }{
"cnt greater than completions": { "cnt greater than completions": {
@ -133,46 +308,42 @@ func TestFirstPendingIndexes(t *testing.T) {
completions: 5, completions: 5,
want: []int{0, 1}, want: []int{0, 1},
}, },
"first pods running or succeeded": { "first pods active": {
pods: []indexPhase{ activePods: []indexPhase{
{"0", v1.PodRunning}, {"0", v1.PodRunning},
{"1", v1.PodPending}, {"1", v1.PodPending},
{"2", v1.PodFailed},
}, },
cnt: 3, cnt: 3,
completions: 10, completions: 10,
want: []int{2, 3, 4}, want: []int{2, 3, 4},
}, },
"last pods running or succeeded": { "last pods active or succeeded": {
pods: []indexPhase{ activePods: []indexPhase{
{"4", v1.PodFailed},
{"5", v1.PodSucceeded},
{"6", v1.PodPending}, {"6", v1.PodPending},
}, },
succeededIndexes: []interval{{4, 5}},
cnt: 6, cnt: 6,
completions: 6, completions: 6,
want: []int{0, 1, 2, 3, 4}, want: []int{0, 1, 2, 3},
}, },
"mixed": { "mixed": {
pods: []indexPhase{ activePods: []indexPhase{
{"1", v1.PodFailed},
{"2", v1.PodSucceeded},
{"3", v1.PodPending}, {"3", v1.PodPending},
{"5", v1.PodFailed},
{"5", v1.PodRunning}, {"5", v1.PodRunning},
{"8", v1.PodPending}, {"8", v1.PodPending},
{noIndex, v1.PodRunning}, {noIndex, v1.PodRunning},
{"-3", v1.PodRunning}, {"-3", v1.PodRunning},
}, },
succeededIndexes: []interval{{2, 4}, {9, 9}},
cnt: 5, cnt: 5,
completions: 10, completions: 20,
want: []int{0, 1, 4, 6, 7}, want: []int{0, 1, 6, 7, 10},
}, },
} }
for name, tc := range cases { for name, tc := range cases {
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
pods := hollowPodsWithIndexPhase(tc.pods) pods := hollowPodsWithIndexPhase(tc.activePods)
got := firstPendingIndexes(pods, tc.cnt, tc.completions) got := firstPendingIndexes(pods, tc.succeededIndexes, tc.cnt, tc.completions)
if diff := cmp.Diff(tc.want, got); diff != "" { if diff := cmp.Diff(tc.want, got); diff != "" {
t.Errorf("Wrong first pending indexes (-want,+got):\n%s", diff) t.Errorf("Wrong first pending indexes (-want,+got):\n%s", diff)
} }

View File

@ -31,9 +31,12 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/apiserver/pkg/util/feature"
batchinformers "k8s.io/client-go/informers/batch/v1" batchinformers "k8s.io/client-go/informers/batch/v1"
coreinformers "k8s.io/client-go/informers/core/v1" coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes" clientset "k8s.io/client-go/kubernetes"
@ -52,7 +55,14 @@ import (
"k8s.io/utils/integer" "k8s.io/utils/integer"
) )
const statusUpdateRetries = 3 const (
statusUpdateRetries = 3
// maxUncountedPods is the maximum size the slices in
// .status.uncountedTerminatedPods should have to keep their representation
// roughly below 20 KB.
maxUncountedPods = 500
)
// controllerKind contains the schema.GroupVersionKind for this controller type. // controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = batch.SchemeGroupVersion.WithKind("Job") var controllerKind = batch.SchemeGroupVersion.WithKind("Job")
@ -71,9 +81,11 @@ type Controller struct {
kubeClient clientset.Interface kubeClient clientset.Interface
podControl controller.PodControlInterface podControl controller.PodControlInterface
// To allow injection of updateJobStatus for testing. // To allow injection of the following for testing.
updateHandler func(job *batch.Job) error updateStatusHandler func(job *batch.Job) error
patchJobHandler func(job *batch.Job, patch []byte) error
syncHandler func(jobKey string) (bool, error) syncHandler func(jobKey string) (bool, error)
// podStoreSynced returns true if the pod store has been synced at least once. // podStoreSynced returns true if the pod store has been synced at least once.
// Added as a member to the struct to allow injection for testing. // Added as a member to the struct to allow injection for testing.
podStoreSynced cache.InformerSynced podStoreSynced cache.InformerSynced
@ -93,6 +105,9 @@ type Controller struct {
// Jobs that need to be updated // Jobs that need to be updated
queue workqueue.RateLimitingInterface queue workqueue.RateLimitingInterface
// Orphan deleted pods that still have a Job tracking finalizer to be removed
orphanQueue workqueue.RateLimitingInterface
recorder record.EventRecorder recorder record.EventRecorder
} }
@ -115,6 +130,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
}, },
expectations: controller.NewControllerExpectations(), expectations: controller.NewControllerExpectations(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"), queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job"),
orphanQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(DefaultJobBackOff, MaxJobBackOff), "job_orphan_pod"),
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}), recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
} }
@ -138,7 +154,8 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
jm.podStore = podInformer.Lister() jm.podStore = podInformer.Lister()
jm.podStoreSynced = podInformer.Informer().HasSynced jm.podStoreSynced = podInformer.Informer().HasSynced
jm.updateHandler = jm.updateJobStatus jm.updateStatusHandler = jm.updateJobStatus
jm.patchJobHandler = jm.patchJob
jm.syncHandler = jm.syncJob jm.syncHandler = jm.syncJob
metrics.Register() metrics.Register()
@ -150,6 +167,7 @@ func NewController(podInformer coreinformers.PodInformer, jobInformer batchinfor
func (jm *Controller) Run(workers int, stopCh <-chan struct{}) { func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer jm.queue.ShutDown() defer jm.queue.ShutDown()
defer jm.orphanQueue.ShutDown()
klog.Infof("Starting job controller") klog.Infof("Starting job controller")
defer klog.Infof("Shutting down job controller") defer klog.Infof("Shutting down job controller")
@ -162,6 +180,8 @@ func (jm *Controller) Run(workers int, stopCh <-chan struct{}) {
go wait.Until(jm.worker, time.Second, stopCh) go wait.Until(jm.worker, time.Second, stopCh)
} }
go wait.Until(jm.orphanWorker, time.Second, stopCh)
<-stopCh <-stopCh
} }
@ -292,7 +312,7 @@ func (jm *Controller) updatePod(old, cur interface{}) {
} }
// When a pod is deleted, enqueue the job that manages the pod and update its expectations. // When a pod is deleted, enqueue the job that manages the pod and update its expectations.
// obj could be an *v1.Pod, or a DeletionFinalStateUnknown marker item. // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
func (jm *Controller) deletePod(obj interface{}) { func (jm *Controller) deletePod(obj interface{}) {
pod, ok := obj.(*v1.Pod) pod, ok := obj.(*v1.Pod)
@ -320,6 +340,9 @@ func (jm *Controller) deletePod(obj interface{}) {
} }
job := jm.resolveControllerRef(pod.Namespace, controllerRef) job := jm.resolveControllerRef(pod.Namespace, controllerRef)
if job == nil { if job == nil {
if hasJobTrackingFinalizer(pod) {
jm.enqueueOrphanPod(pod)
}
return return
} }
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(job)
@ -380,9 +403,19 @@ func (jm *Controller) enqueueController(obj interface{}, immediate bool) {
// all controllers there will still be some replica instability. One way to handle this is // all controllers there will still be some replica instability. One way to handle this is
// by querying the store for all controllers that this rc overlaps, as well as all // by querying the store for all controllers that this rc overlaps, as well as all
// controllers that overlap this rc, and sorting them. // controllers that overlap this rc, and sorting them.
klog.Infof("enqueueing job %s", key)
jm.queue.AddAfter(key, backoff) jm.queue.AddAfter(key, backoff)
} }
func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
}
jm.orphanQueue.Add(key)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done. // worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key. // It enforces that the syncHandler is never invoked concurrently with the same key.
func (jm *Controller) worker() { func (jm *Controller) worker() {
@ -411,10 +444,61 @@ func (jm *Controller) processNextWorkItem() bool {
return true return true
} }
func (jm *Controller) orphanWorker() {
for jm.processNextOrphanPod() {
}
}
func (jm Controller) processNextOrphanPod() bool {
key, quit := jm.orphanQueue.Get()
if quit {
return false
}
defer jm.orphanQueue.Done(key)
err := jm.syncOrphanPod(key.(string))
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing orphan pod: %v", err))
jm.orphanQueue.AddRateLimited(key)
} else {
jm.orphanQueue.Forget(key)
}
return true
}
// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
func (jm Controller) syncOrphanPod(key string) error {
startTime := time.Now()
defer func() {
klog.V(4).Infof("Finished syncing orphan pod %q (%v)", key, time.Since(startTime))
}()
ns, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
sharedPod, err := jm.podStore.Pods(ns).Get(name)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("Orphan pod has been deleted: %v", key)
return nil
}
return err
}
if patch := removeTrackingFinalizerPatch(sharedPod); patch != nil {
if err := jm.podControl.PatchPod(ns, name, patch); err != nil && !apierrors.IsNotFound(err) {
return err
}
}
return nil
}
// getPodsForJob returns the set of pods that this Job should manage. // getPodsForJob returns the set of pods that this Job should manage.
// It also reconciles ControllerRef by adopting/orphaning. // It also reconciles ControllerRef by adopting/orphaning, adding tracking
// finalizers, if enabled.
// Note that the returned Pods are pointers into the cache. // Note that the returned Pods are pointers into the cache.
func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) { func (jm *Controller) getPodsForJob(j *batch.Job, withFinalizers bool) ([]*v1.Pod, error) {
selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector) selector, err := metav1.LabelSelectorAsSelector(j.Spec.Selector)
if err != nil { if err != nil {
return nil, fmt.Errorf("couldn't convert Job selector: %v", err) return nil, fmt.Errorf("couldn't convert Job selector: %v", err)
@ -437,8 +521,31 @@ func (jm *Controller) getPodsForJob(j *batch.Job) ([]*v1.Pod, error) {
} }
return fresh, nil return fresh, nil
}) })
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc) var finalizers []string
return cm.ClaimPods(pods) if withFinalizers {
finalizers = append(finalizers, batch.JobTrackingFinalizer)
}
cm := controller.NewPodControllerRefManager(jm.podControl, j, selector, controllerKind, canAdoptFunc, finalizers...)
// When adopting Pods, this operation adds an ownerRef and finalizers.
pods, err = cm.ClaimPods(pods)
if err != nil || !withFinalizers {
return pods, err
}
// Set finalizer on adopted pods for the remaining calculations.
for i, p := range pods {
adopted := true
for _, r := range p.OwnerReferences {
if r.UID == j.UID {
adopted = false
break
}
}
if adopted && !hasJobTrackingFinalizer(p) {
pods[i] = p.DeepCopy()
pods[i].Finalizers = append(p.Finalizers, batch.JobTrackingFinalizer)
}
}
return pods, err
} }
// syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning // syncJob will sync the job with the given key if it has had its expectations fulfilled, meaning
@ -475,7 +582,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
} }
// Cannot create Pods if this is an Indexed Job and the feature is disabled. // Cannot create Pods if this is an Indexed Job and the feature is disabled.
if !utilfeature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) { if !feature.DefaultFeatureGate.Enabled(features.IndexedJob) && isIndexedJob(&job) {
jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.") jm.recorder.Event(&job, v1.EventTypeWarning, "IndexedJobDisabled", "Skipped Indexed Job sync because feature is disabled.")
return false, nil return false, nil
} }
@ -500,18 +607,32 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc() metrics.JobSyncNum.WithLabelValues(completionMode, result, action).Inc()
}() }()
var uncounted *uncountedTerminatedPods
if trackingUncountedPods(&job) {
klog.V(4).InfoS("Tracking uncounted Pods with pod finalizers", "job", klog.KObj(&job))
if job.Status.UncountedTerminatedPods == nil {
job.Status.UncountedTerminatedPods = &batch.UncountedTerminatedPods{}
}
uncounted = newUncountedTerminatedPods(*job.Status.UncountedTerminatedPods)
} else if patch := removeTrackingAnnotationPatch(&job); patch != nil {
if err := jm.patchJobHandler(&job, patch); err != nil {
return false, fmt.Errorf("removing tracking finalizer from job %s: %w", key, err)
}
}
// Check the expectations of the job before counting active pods, otherwise a new pod can sneak in // Check the expectations of the job before counting active pods, otherwise a new pod can sneak in
// and update the expectations after we've retrieved active pods from the store. If a new pod enters // and update the expectations after we've retrieved active pods from the store. If a new pod enters
// the store after we've checked the expectation, the job sync is just deferred till the next relist. // the store after we've checked the expectation, the job sync is just deferred till the next relist.
jobNeedsSync := jm.expectations.SatisfiedExpectations(key) jobNeedsSync := jm.expectations.SatisfiedExpectations(key)
pods, err := jm.getPodsForJob(&job) pods, err := jm.getPodsForJob(&job, uncounted != nil)
if err != nil { if err != nil {
return false, err return false, err
} }
activePods := controller.FilterActivePods(pods) activePods := controller.FilterActivePods(pods)
active := int32(len(activePods)) active := int32(len(activePods))
succeeded, failed := getStatus(&job, pods) succeeded, failed := getStatus(&job, pods, uncounted)
// Job first start. Set StartTime and start the ActiveDeadlineSeconds timer // Job first start. Set StartTime and start the ActiveDeadlineSeconds timer
// only if the job is not in the suspended state. // only if the job is not in the suspended state.
if job.Status.StartTime == nil && !jobSuspended(&job) { if job.Status.StartTime == nil && !jobSuspended(&job) {
@ -526,53 +647,49 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
} }
var manageJobErr error var manageJobErr error
jobFailed := false var finishedCondition *batch.JobCondition
var failureReason string
var failureMessage string
jobHaveNewFailure := failed > job.Status.Failed jobHasNewFailure := failed > job.Status.Failed
// new failures happen when status does not reflect the failures and active // new failures happen when status does not reflect the failures and active
// is different than parallelism, otherwise the previous controller loop // is different than parallelism, otherwise the previous controller loop
// failed updating status so even if we pick up failure it is not a new one // failed updating status so even if we pick up failure it is not a new one
exceedsBackoffLimit := jobHaveNewFailure && (active != *job.Spec.Parallelism) && exceedsBackoffLimit := jobHasNewFailure && (active != *job.Spec.Parallelism) &&
(failed > *job.Spec.BackoffLimit) (failed > *job.Spec.BackoffLimit)
if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) { if exceedsBackoffLimit || pastBackoffLimitOnFailure(&job, pods) {
// check if the number of pod restart exceeds backoff (for restart OnFailure only) // check if the number of pod restart exceeds backoff (for restart OnFailure only)
// OR if the number of failed jobs increased since the last syncJob // OR if the number of failed jobs increased since the last syncJob
jobFailed = true finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "BackoffLimitExceeded", "Job has reached the specified backoff limit")
failureReason = "BackoffLimitExceeded"
failureMessage = "Job has reached the specified backoff limit"
} else if pastActiveDeadline(&job) { } else if pastActiveDeadline(&job) {
jobFailed = true finishedCondition = newCondition(batch.JobFailed, v1.ConditionTrue, "DeadlineExceeded", "Job was active longer than specified deadline")
failureReason = "DeadlineExceeded"
failureMessage = "Job was active longer than specified deadline"
} }
var succeededIndexes string var prevSucceededIndexes, succeededIndexes orderedIntervals
if isIndexedJob(&job) { if isIndexedJob(&job) {
succeededIndexes, succeeded = calculateSucceededIndexes(pods, *job.Spec.Completions) prevSucceededIndexes, succeededIndexes = calculateSucceededIndexes(&job, pods)
succeeded = int32(succeededIndexes.total())
} }
jobConditionsChanged := false suspendCondChanged := false
manageJobCalled := false // Remove active pods if Job failed.
if jobFailed { if finishedCondition != nil {
// TODO(#28486): Account for pod failures in status once we can track deleted, err := jm.deleteActivePods(&job, activePods)
// completions without lingering pods. if uncounted == nil {
_, manageJobErr = jm.deleteJobPods(&job, "", activePods) // Legacy behavior: pretend all active pods were successfully removed.
deleted = active
// update status values accordingly } else if deleted != active {
failed += active // Can't declare the Job as finished yet, as there might be remaining
active = 0 // pod finalizers.
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobFailed, v1.ConditionTrue, failureReason, failureMessage)) finishedCondition = nil
jobConditionsChanged = true }
jm.recorder.Event(&job, v1.EventTypeWarning, failureReason, failureMessage) active -= deleted
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc() failed += deleted
manageJobErr = err
} else { } else {
manageJobCalled := false
if jobNeedsSync && job.DeletionTimestamp == nil { if jobNeedsSync && job.DeletionTimestamp == nil {
active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, pods) active, action, manageJobErr = jm.manageJob(&job, activePods, succeeded, succeededIndexes)
manageJobCalled = true manageJobCalled = true
} }
completions := succeeded
complete := false complete := false
if job.Spec.Completions == nil { if job.Spec.Completions == nil {
// This type of job is complete when any pod exits with success. // This type of job is complete when any pod exits with success.
@ -581,29 +698,17 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
// not expected to fail, but if they do, the failure is ignored. Once any // not expected to fail, but if they do, the failure is ignored. Once any
// pod succeeds, the controller waits for remaining pods to finish, and // pod succeeds, the controller waits for remaining pods to finish, and
// then the job is complete. // then the job is complete.
if succeeded > 0 && active == 0 { complete = succeeded > 0 && active == 0
complete = true
}
} else { } else {
// Job specifies a number of completions. This type of job signals // Job specifies a number of completions. This type of job signals
// success by having that number of successes. Since we do not // success by having that number of successes. Since we do not
// start more pods than there are remaining completions, there should // start more pods than there are remaining completions, there should
// not be any remaining active pods once this count is reached. // not be any remaining active pods once this count is reached.
if completions >= *job.Spec.Completions && active == 0 { complete = succeeded >= *job.Spec.Completions && active == 0
complete = true
if completions > *job.Spec.Completions {
jm.recorder.Event(&job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
}
} }
if complete { if complete {
job.Status.Conditions = append(job.Status.Conditions, newCondition(batch.JobComplete, v1.ConditionTrue, "", "")) finishedCondition = newCondition(batch.JobComplete, v1.ConditionTrue, "", "")
jobConditionsChanged = true } else if feature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
now := metav1.Now()
job.Status.CompletionTime = &now
jm.recorder.Event(&job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
} else if utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && manageJobCalled {
// Update the conditions / emit events only if manageJob was called in // Update the conditions / emit events only if manageJob was called in
// this syncJob. Otherwise wait for the right syncJob call to make // this syncJob. Otherwise wait for the right syncJob call to make
// updates. // updates.
@ -612,7 +717,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
var isUpdated bool var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended") job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionTrue, "JobSuspended", "Job suspended")
if isUpdated { if isUpdated {
jobConditionsChanged = true suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended") jm.recorder.Event(&job, v1.EventTypeNormal, "Suspended", "Job suspended")
} }
} else { } else {
@ -620,7 +725,7 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
var isUpdated bool var isUpdated bool
job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed") job.Status.Conditions, isUpdated = ensureJobConditionStatus(job.Status.Conditions, batch.JobSuspended, v1.ConditionFalse, "JobResumed", "Job resumed")
if isUpdated { if isUpdated {
jobConditionsChanged = true suspendCondChanged = true
jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed") jm.recorder.Event(&job, v1.EventTypeNormal, "Resumed", "Job resumed")
// Resumed jobs will always reset StartTime to current time. This is // Resumed jobs will always reset StartTime to current time. This is
// done because the ActiveDeadlineSeconds timer shouldn't go off // done because the ActiveDeadlineSeconds timer shouldn't go off
@ -644,20 +749,44 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
forget = true forget = true
} }
if uncounted != nil {
needsStatusUpdate := suspendCondChanged || active != job.Status.Active
job.Status.Active = active
err = jm.trackJobStatusAndRemoveFinalizers(&job, pods, prevSucceededIndexes, *uncounted, finishedCondition, needsStatusUpdate)
if err != nil {
return false, err
}
jobFinished := IsJobFinished(&job)
if jobHasNewFailure && !jobFinished {
// returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
}
forget = true
return forget, manageJobErr
}
// Legacy path: tracking without finalizers.
// Ensure that there are no leftover tracking finalizers.
if err := jm.removeTrackingFinalizersFromAllPods(pods); err != nil {
return false, fmt.Errorf("removing disabled finalizers from job pods %s: %w", key, err)
}
// no need to update the job if the status hasn't changed since last time // no need to update the job if the status hasn't changed since last time
if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || jobConditionsChanged { if job.Status.Active != active || job.Status.Succeeded != succeeded || job.Status.Failed != failed || suspendCondChanged || finishedCondition != nil {
job.Status.Active = active job.Status.Active = active
job.Status.Succeeded = succeeded job.Status.Succeeded = succeeded
job.Status.Failed = failed job.Status.Failed = failed
if isIndexedJob(&job) { if isIndexedJob(&job) {
job.Status.CompletedIndexes = succeededIndexes job.Status.CompletedIndexes = succeededIndexes.String()
} }
job.Status.UncountedTerminatedPods = nil
jm.enactJobFinished(&job, finishedCondition)
if err := jm.updateHandler(&job); err != nil { if err := jm.updateStatusHandler(&job); err != nil {
return forget, err return forget, err
} }
if jobHaveNewFailure && !IsJobFinished(&job) { if jobHasNewFailure && !IsJobFinished(&job) {
// returning an error will re-enqueue Job after the backoff period // returning an error will re-enqueue Job after the backoff period
return forget, fmt.Errorf("failed pod(s) detected for job key %q", key) return forget, fmt.Errorf("failed pod(s) detected for job key %q", key)
} }
@ -668,9 +797,12 @@ func (jm *Controller) syncJob(key string) (forget bool, rErr error) {
return forget, manageJobErr return forget, manageJobErr
} }
// deleteJobPods deletes the pods, returns the number of successful removals // deleteActivePods issues deletion for active Pods, preserving finalizers.
// and any error. // This is done through DELETE calls that set deletion timestamps.
func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) { // The method trackJobStatusAndRemoveFinalizers removes the finalizers, after
// which the objects can actually be deleted.
// Returns number of successfully deletions issued.
func (jm *Controller) deleteActivePods(job *batch.Job, pods []*v1.Pod) (int32, error) {
errCh := make(chan error, len(pods)) errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods)) successfulDeletes := int32(len(pods))
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
@ -678,11 +810,26 @@ func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Po
for i := range pods { for i := range pods {
go func(pod *v1.Pod) { go func(pod *v1.Pod) {
defer wg.Done() defer wg.Done()
if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil { if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil && !apierrors.IsNotFound(err) {
// Decrement the expected number of deletes because the informer won't observe this deletion atomic.AddInt32(&successfulDeletes, -1)
if jobKey != "" { errCh <- err
jm.expectations.DeletionObserved(jobKey) utilruntime.HandleError(err)
} }
}(pods[i])
}
wg.Wait()
return successfulDeletes, errorFromChannel(errCh)
}
// deleteJobPods deletes the pods, returns the number of successful removals
// and any error.
func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Pod) (int32, error) {
errCh := make(chan error, len(pods))
successfulDeletes := int32(len(pods))
failDelete := func(pod *v1.Pod, err error) {
// Decrement the expected number of deletes because the informer won't observe this deletion
jm.expectations.DeletionObserved(jobKey)
if !apierrors.IsNotFound(err) { if !apierrors.IsNotFound(err) {
klog.V(2).Infof("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err) klog.V(2).Infof("Failed to delete Pod", "job", klog.KObj(job), "pod", klog.KObj(pod), "err", err)
atomic.AddInt32(&successfulDeletes, -1) atomic.AddInt32(&successfulDeletes, -1)
@ -690,12 +837,251 @@ func (jm *Controller) deleteJobPods(job *batch.Job, jobKey string, pods []*v1.Po
utilruntime.HandleError(err) utilruntime.HandleError(err)
} }
} }
wg := sync.WaitGroup{}
wg.Add(len(pods))
for i := range pods {
go func(pod *v1.Pod) {
defer wg.Done()
if patch := removeTrackingFinalizerPatch(pod); patch != nil {
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil {
failDelete(pod, fmt.Errorf("removing completion finalizer: %w", err))
return
}
}
if err := jm.podControl.DeletePod(job.Namespace, pod.Name, job); err != nil {
failDelete(pod, err)
}
}(pods[i]) }(pods[i])
} }
wg.Wait() wg.Wait()
return successfulDeletes, errorFromChannel(errCh) return successfulDeletes, errorFromChannel(errCh)
} }
// removeTrackingFinalizersFromAllPods removes finalizers from any Job Pod. This is called
// when Job tracking with finalizers is disabled.
func (jm *Controller) removeTrackingFinalizersFromAllPods(pods []*v1.Pod) error {
var podsWithFinalizer []*v1.Pod
for _, pod := range pods {
if hasJobTrackingFinalizer(pod) {
podsWithFinalizer = append(podsWithFinalizer, pod)
}
}
if len(podsWithFinalizer) == 0 {
return nil
}
_, err := jm.removeTrackingFinalizerFromPods(podsWithFinalizer)
return err
}
// trackJobStatusAndRemoveFinalizers does:
// 1. Add finished Pods to .status.uncountedTerminatedPods
// 2. Remove the finalizers from the Pods if they completed or were removed
// or the job was removed.
// 3. Increment job counters for pods that no longer have a finalizer.
// 4. Add Complete condition if satisfied with current counters.
// It does this in a controlled way such that the size of .status doesn't grow
// too much.
func (jm *Controller) trackJobStatusAndRemoveFinalizers(job *batch.Job, pods []*v1.Pod, succeededIndexes orderedIntervals, uncounted uncountedTerminatedPods, finishedCond *batch.JobCondition, needsFlush bool) error {
isIndexed := isIndexedJob(job)
var podsToRemoveFinalizer []*v1.Pod
uncountedStatus := job.Status.UncountedTerminatedPods
var newSucceededIndexes []int
if isIndexed {
// Sort to introduce completed Indexes First.
sort.Sort(byCompletionIndex(pods))
}
for _, pod := range pods {
if !hasJobTrackingFinalizer(pod) {
continue
}
podFinished := pod.Status.Phase == v1.PodSucceeded || pod.Status.Phase == v1.PodFailed
// Terminating pods are counted as failed. This guarantees that orphan Pods
// count as failures.
// Active pods are terminated when the job has completed, thus they count as
// failures as well.
podTerminating := pod.DeletionTimestamp != nil || finishedCond != nil
if podFinished || podTerminating || job.DeletionTimestamp != nil {
podsToRemoveFinalizer = append(podsToRemoveFinalizer, pod)
}
if pod.Status.Phase == v1.PodSucceeded {
if isIndexed {
// The completion index is enough to avoid recounting succeeded pods.
// No need to track UIDs.
ix := getCompletionIndex(pod.Annotations)
if ix != unknownCompletionIndex && ix < int(*job.Spec.Completions) && !succeededIndexes.has(ix) {
newSucceededIndexes = append(newSucceededIndexes, ix)
needsFlush = true
}
} else if !uncounted.succeeded.Has(string(pod.UID)) {
needsFlush = true
uncountedStatus.Succeeded = append(uncountedStatus.Succeeded, pod.UID)
}
} else if pod.Status.Phase == v1.PodFailed || podTerminating {
ix := getCompletionIndex(pod.Annotations)
if !uncounted.failed.Has(string(pod.UID)) && (!isIndexed || (ix != unknownCompletionIndex && ix < int(*job.Spec.Completions))) {
needsFlush = true
uncountedStatus.Failed = append(uncountedStatus.Failed, pod.UID)
}
}
if len(uncountedStatus.Succeeded)+len(uncountedStatus.Failed) >= maxUncountedPods {
if len(newSucceededIndexes) > 0 {
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
job.Status.Succeeded = int32(succeededIndexes.total())
job.Status.CompletedIndexes = succeededIndexes.String()
}
var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil {
return err
}
podsToRemoveFinalizer = nil
newSucceededIndexes = nil
}
}
if len(newSucceededIndexes) > 0 {
succeededIndexes = succeededIndexes.withOrderedIndexes(newSucceededIndexes)
job.Status.Succeeded = int32(succeededIndexes.total())
job.Status.CompletedIndexes = succeededIndexes.String()
}
var err error
if needsFlush, err = jm.flushUncountedAndRemoveFinalizers(job, podsToRemoveFinalizer, needsFlush); err != nil {
return err
}
if jm.enactJobFinished(job, finishedCond) {
needsFlush = true
}
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return fmt.Errorf("removing uncounted pods from status: %w", err)
}
}
return nil
}
// flushUncountedAndRemoveFinalizers does:
// 1. flush the Job status that might include new uncounted Pod UIDs.
// 2. perform the removal of finalizers from Pods which are in the uncounted
// lists.
// 3. update the counters based on the Pods for which it successfully removed
// the finalizers.
// 4. (if not all removals succeeded) flush Job status again.
// Returns whether there are pending changes in the Job status that need to be
// flushed in subsequent calls.
func (jm *Controller) flushUncountedAndRemoveFinalizers(job *batch.Job, podsToRemoveFinalizer []*v1.Pod, needsFlush bool) (bool, error) {
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("adding uncounted pods to status: %w", err)
}
needsFlush = false
}
var failedToRm []*v1.Pod
var rmErr error
if len(podsToRemoveFinalizer) > 0 {
failedToRm, rmErr = jm.removeTrackingFinalizerFromPods(podsToRemoveFinalizer)
}
uncountedStatus := job.Status.UncountedTerminatedPods
if rmErr == nil {
needsFlush = len(uncountedStatus.Succeeded) > 0 || len(uncountedStatus.Failed) > 0
job.Status.Succeeded += int32(len(uncountedStatus.Succeeded))
uncountedStatus.Succeeded = nil
job.Status.Failed += int32(len(uncountedStatus.Failed))
uncountedStatus.Failed = nil
return needsFlush, nil
}
uidsWithFinalizer := make(sets.String, len(failedToRm))
for _, p := range failedToRm {
uidsWithFinalizer.Insert(string(p.UID))
}
newUncounted := uncountedWithFailedFinalizerRemovals(uncountedStatus.Succeeded, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Succeeded) {
needsFlush = true
job.Status.Succeeded += int32(len(uncountedStatus.Succeeded) - len(newUncounted))
uncountedStatus.Succeeded = newUncounted
}
newUncounted = uncountedWithFailedFinalizerRemovals(uncountedStatus.Failed, uidsWithFinalizer)
if len(newUncounted) != len(uncountedStatus.Failed) {
needsFlush = true
job.Status.Failed += int32(len(uncountedStatus.Failed) - len(newUncounted))
uncountedStatus.Failed = newUncounted
}
if needsFlush {
if err := jm.updateStatusHandler(job); err != nil {
return needsFlush, fmt.Errorf("removing uncounted pods from status: %w", err)
}
}
return needsFlush, rmErr
}
// removeTrackingFinalizerFromPods removes tracking finalizers from Pods and
// returns the pod for which the operation failed (if the pod was deleted when
// this function was called, it's considered as the finalizer was removed
// successfully).
func (jm *Controller) removeTrackingFinalizerFromPods(pods []*v1.Pod) ([]*v1.Pod, error) {
errCh := make(chan error, len(pods))
var failed []*v1.Pod
var lock sync.Mutex
wg := sync.WaitGroup{}
wg.Add(len(pods))
for i := range pods {
go func(i int) {
pod := pods[i]
defer wg.Done()
if patch := removeTrackingFinalizerPatch(pod); patch != nil {
if err := jm.podControl.PatchPod(pod.Namespace, pod.Name, patch); err != nil && !apierrors.IsNotFound(err) {
errCh <- err
utilruntime.HandleError(err)
lock.Lock()
failed = append(failed, pod)
lock.Unlock()
return
}
}
}(i)
}
wg.Wait()
return failed, errorFromChannel(errCh)
}
// enactJobFinished adds the Complete or Failed condition and records events.
// Returns whether the Job was considered finished.
func (jm *Controller) enactJobFinished(job *batch.Job, finishedCond *batch.JobCondition) bool {
if finishedCond == nil {
return false
}
if uncounted := job.Status.UncountedTerminatedPods; uncounted != nil {
if len(uncounted.Succeeded) > 0 || len(uncounted.Failed) > 0 {
return false
}
}
completionMode := string(batch.NonIndexedCompletion)
if isIndexedJob(job) {
completionMode = string(*job.Spec.CompletionMode)
}
job.Status.Conditions = append(job.Status.Conditions, *finishedCond)
if finishedCond.Type == batch.JobComplete {
if job.Spec.Completions != nil && job.Status.Succeeded > *job.Spec.Completions {
jm.recorder.Event(job, v1.EventTypeWarning, "TooManySucceededPods", "Too many succeeded pods running after completion count reached")
}
job.Status.CompletionTime = &finishedCond.LastTransitionTime
jm.recorder.Event(job, v1.EventTypeNormal, "Completed", "Job completed")
metrics.JobFinishedNum.WithLabelValues(completionMode, "succeeded").Inc()
} else {
jm.recorder.Event(job, v1.EventTypeWarning, finishedCond.Reason, finishedCond.Message)
metrics.JobFinishedNum.WithLabelValues(completionMode, "failed").Inc()
}
return true
}
func uncountedWithFailedFinalizerRemovals(uncounted []types.UID, uidsWithFinalizer sets.String) []types.UID {
var newUncounted []types.UID
for _, uid := range uncounted {
if uidsWithFinalizer.Has(string(uid)) {
newUncounted = append(newUncounted, uid)
}
}
return newUncounted
}
// pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit // pastBackoffLimitOnFailure checks if container restartCounts sum exceeds BackoffLimit
// this method applies only to pods with restartPolicy == OnFailure // this method applies only to pods with restartPolicy == OnFailure
func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool { func pastBackoffLimitOnFailure(job *batch.Job, pods []*v1.Pod) bool {
@ -736,8 +1122,8 @@ func pastActiveDeadline(job *batch.Job) bool {
return duration >= allowedDuration return duration >= allowedDuration
} }
func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string) batch.JobCondition { func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatus, reason, message string) *batch.JobCondition {
return batch.JobCondition{ return &batch.JobCondition{
Type: conditionType, Type: conditionType,
Status: status, Status: status,
LastProbeTime: metav1.Now(), LastProbeTime: metav1.Now(),
@ -747,23 +1133,33 @@ func newCondition(conditionType batch.JobConditionType, status v1.ConditionStatu
} }
} }
// getStatus returns no of succeeded and failed pods running a job // getStatus returns number of succeeded and failed pods running a job
func getStatus(job *batch.Job, pods []*v1.Pod) (succeeded, failed int32) { func getStatus(job *batch.Job, pods []*v1.Pod, uncounted *uncountedTerminatedPods) (succeeded, failed int32) {
succeeded = int32(countPodsByPhase(job, pods, v1.PodSucceeded)) if uncounted != nil {
failed = int32(countPodsByPhase(job, pods, v1.PodFailed)) succeeded = job.Status.Succeeded
return failed = job.Status.Failed
}
succeeded += int32(countValidPodsWithFilter(job, pods, uncounted.Succeeded(), func(p *v1.Pod) bool {
return p.Status.Phase == v1.PodSucceeded
}))
failed += int32(countValidPodsWithFilter(job, pods, uncounted.Failed(), func(p *v1.Pod) bool {
// Counting deleted Pods as failures to account for orphan Pods that never
// have a chance to reach the Failed phase.
return p.Status.Phase == v1.PodFailed || (p.DeletionTimestamp != nil && p.Status.Phase != v1.PodSucceeded)
}))
return succeeded, failed
} }
// jobSuspended returns whether a Job is suspended while taking the feature // jobSuspended returns whether a Job is suspended while taking the feature
// gate into account. // gate into account.
func jobSuspended(job *batch.Job) bool { func jobSuspended(job *batch.Job) bool {
return utilfeature.DefaultFeatureGate.Enabled(features.SuspendJob) && job.Spec.Suspend != nil && *job.Spec.Suspend return feature.DefaultFeatureGate.Enabled(features.SuspendJob) && job.Spec.Suspend != nil && *job.Spec.Suspend
} }
// manageJob is the core method responsible for managing the number of running // manageJob is the core method responsible for managing the number of running
// pods according to what is specified in the job.Spec. // pods according to what is specified in the job.Spec.
// Does NOT modify <activePods>. // Does NOT modify <activePods>.
func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, allPods []*v1.Pod) (int32, string, error) { func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded int32, succeededIndexes []interval) (int32, string, error) {
active := int32(len(activePods)) active := int32(len(activePods))
parallelism := *job.Spec.Parallelism parallelism := *job.Spec.Parallelism
jobKey, err := controller.KeyFunc(job) jobKey, err := controller.KeyFunc(job)
@ -798,6 +1194,9 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
if wantActive > parallelism { if wantActive > parallelism {
wantActive = parallelism wantActive = parallelism
} }
if wantActive < 0 {
wantActive = 0
}
} }
rmAtLeast := active - wantActive rmAtLeast := active - wantActive
@ -834,7 +1233,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
var indexesToAdd []int var indexesToAdd []int
if isIndexedJob(job) { if isIndexedJob(job) {
indexesToAdd = firstPendingIndexes(allPods, int(diff), int(*job.Spec.Completions)) indexesToAdd = firstPendingIndexes(activePods, succeededIndexes, int(diff), int(*job.Spec.Completions))
diff = int32(len(indexesToAdd)) diff = int32(len(indexesToAdd))
} }
active += diff active += diff
@ -843,6 +1242,9 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
if isIndexedJob(job) { if isIndexedJob(job) {
addCompletionIndexEnvVariables(podTemplate) addCompletionIndexEnvVariables(podTemplate)
} }
if trackingUncountedPods(job) {
podTemplate.Finalizers = appendJobCompletionFinalizerIfNotFound(podTemplate.Finalizers)
}
// Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize // Batch the pod creates. Batch sizes start at SlowStartInitialBatchSize
// and double with each successful iteration in a kind of "slow start". // and double with each successful iteration in a kind of "slow start".
@ -857,7 +1259,7 @@ func (jm *Controller) manageJob(job *batch.Job, activePods []*v1.Pod, succeeded
wait.Add(int(batchSize)) wait.Add(int(batchSize))
for i := int32(0); i < batchSize; i++ { for i := int32(0); i < batchSize; i++ {
completionIndex := unknownCompletionIndex completionIndex := unknownCompletionIndex
if indexesToAdd != nil { if len(indexesToAdd) > 0 {
completionIndex = indexesToAdd[0] completionIndex = indexesToAdd[0]
indexesToAdd = indexesToAdd[1:] indexesToAdd = indexesToAdd[1:]
} }
@ -952,6 +1354,12 @@ func (jm *Controller) updateJobStatus(job *batch.Job) error {
return err return err
} }
func (jm *Controller) patchJob(job *batch.Job, data []byte) error {
_, err := jm.kubeClient.BatchV1().Jobs(job.Namespace).Patch(
context.TODO(), job.Name, types.StrategicMergePatchType, data, metav1.PatchOptions{})
return err
}
func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration { func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Duration {
exp := queue.NumRequeues(key) exp := queue.NumRequeues(key)
@ -972,18 +1380,121 @@ func getBackoff(queue workqueue.RateLimitingInterface, key interface{}) time.Dur
return calculated return calculated
} }
// countPodsByPhase returns pods based on their phase. // countValidPodsWithFilter returns number of valid pods that pass the filter.
func countPodsByPhase(job *batch.Job, pods []*v1.Pod, phase v1.PodPhase) int { // Pods are valid if they have a finalizer and, for Indexed Jobs, a valid
result := 0 // completion index.
func countValidPodsWithFilter(job *batch.Job, pods []*v1.Pod, uncounted sets.String, filter func(*v1.Pod) bool) int {
result := len(uncounted)
for _, p := range pods { for _, p := range pods {
// Pods that don't have a completion finalizer are in the uncounted set or
// have already been accounted for in the Job status.
if uncounted != nil && (!hasJobTrackingFinalizer(p) || uncounted.Has(string(p.UID))) {
continue
}
if isIndexedJob(job) {
idx := getCompletionIndex(p.Annotations) idx := getCompletionIndex(p.Annotations)
if phase == p.Status.Phase && (!isIndexedJob(job) || (idx != unknownCompletionIndex && idx < int(*job.Spec.Completions))) { if idx == unknownCompletionIndex || idx >= int(*job.Spec.Completions) {
continue
}
}
if filter(p) {
result++ result++
} }
} }
return result return result
} }
func trackingUncountedPods(job *batch.Job) bool {
return feature.DefaultFeatureGate.Enabled(features.JobTrackingWithFinalizers) && hasJobTrackingAnnotation(job)
}
func hasJobTrackingAnnotation(job *batch.Job) bool {
if job.Annotations == nil {
return false
}
_, ok := job.Annotations[batch.JobTrackingFinalizer]
return ok
}
func appendJobCompletionFinalizerIfNotFound(finalizers []string) []string {
for _, fin := range finalizers {
if fin == batch.JobTrackingFinalizer {
return finalizers
}
}
return append(finalizers, batch.JobTrackingFinalizer)
}
func removeTrackingFinalizerPatch(pod *v1.Pod) []byte {
if !hasJobTrackingFinalizer(pod) {
return nil
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"$deleteFromPrimitiveList/finalizers": []string{batch.JobTrackingFinalizer},
},
}
patchBytes, _ := json.Marshal(patch)
return patchBytes
}
func removeTrackingAnnotationPatch(job *batch.Job) []byte {
if !hasJobTrackingAnnotation(job) {
return nil
}
patch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": map[string]interface{}{
batch.JobTrackingFinalizer: nil,
},
},
}
patchBytes, _ := json.Marshal(patch)
return patchBytes
}
func hasJobTrackingFinalizer(pod *v1.Pod) bool {
for _, fin := range pod.Finalizers {
if fin == batch.JobTrackingFinalizer {
return true
}
}
return false
}
type uncountedTerminatedPods struct {
succeeded sets.String
failed sets.String
}
func newUncountedTerminatedPods(in batch.UncountedTerminatedPods) *uncountedTerminatedPods {
obj := uncountedTerminatedPods{
succeeded: make(sets.String, len(in.Succeeded)),
failed: make(sets.String, len(in.Failed)),
}
for _, v := range in.Succeeded {
obj.succeeded.Insert(string(v))
}
for _, v := range in.Failed {
obj.failed.Insert(string(v))
}
return &obj
}
func (u *uncountedTerminatedPods) Succeeded() sets.String {
if u == nil {
return nil
}
return u.succeeded
}
func (u *uncountedTerminatedPods) Failed() sets.String {
if u == nil {
return nil
}
return u.failed
}
func errorFromChannel(errCh <-chan error) error { func errorFromChannel(errCh <-chan error) error {
select { select {
case err := <-errCh: case err := <-errCh:
@ -1014,7 +1525,7 @@ func ensureJobConditionStatus(list []batch.JobCondition, cType batch.JobConditio
} }
// A condition with that type doesn't exist in the list. // A condition with that type doesn't exist in the list.
if status != v1.ConditionFalse { if status != v1.ConditionFalse {
return append(list, newCondition(cType, status, reason, message)), true return append(list, *newCondition(cType, status, reason, message)), true
} }
return list, false return list, false
} }

File diff suppressed because it is too large Load Diff

View File

@ -529,15 +529,13 @@ func GetKey(obj interface{}, t *testing.T) string {
} }
val := reflect.ValueOf(obj).Elem() val := reflect.ValueOf(obj).Elem()
name := val.FieldByName("Name").String() name := val.FieldByName("Name").String()
kind := val.FieldByName("Kind").String() if len(name) == 0 {
// Note kind is not always set in the tests, so ignoring that for now
if len(name) == 0 || len(kind) == 0 {
t.Errorf("Unexpected object %v", obj) t.Errorf("Unexpected object %v", obj)
} }
key, err := keyFunc(obj) key, err := keyFunc(obj)
if err != nil { if err != nil {
t.Errorf("Unexpected error getting key for %v %v: %v", kind, name, err) t.Errorf("Unexpected error getting key for %T %v: %v", val.Interface(), name, err)
return "" return ""
} }
return key return key

View File

@ -231,7 +231,7 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{ addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "job-controller"}, ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "job-controller"},
Rules: []rbacv1.PolicyRule{ Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("get", "list", "watch", "update").Groups(batchGroup).Resources("jobs").RuleOrDie(), rbacv1helpers.NewRule("get", "list", "watch", "update", "patch").Groups(batchGroup).Resources("jobs").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(batchGroup).Resources("jobs/status").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(batchGroup).Resources("jobs/status").RuleOrDie(),
rbacv1helpers.NewRule("update").Groups(batchGroup).Resources("jobs/finalizers").RuleOrDie(), rbacv1helpers.NewRule("update").Groups(batchGroup).Resources("jobs/finalizers").RuleOrDie(),
rbacv1helpers.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(), rbacv1helpers.NewRule("list", "watch", "create", "delete", "patch").Groups(legacyGroup).Resources("pods").RuleOrDie(),

View File

@ -791,6 +791,7 @@ items:
verbs: verbs:
- get - get
- list - list
- patch
- update - update
- watch - watch
- apiGroups: - apiGroups:

View File

@ -44,10 +44,16 @@ import (
"k8s.io/utils/pointer" "k8s.io/utils/pointer"
) )
const waitInterval = 500 * time.Millisecond
// TestNonParallelJob tests that a Job that only executes one Pod. The test // TestNonParallelJob tests that a Job that only executes one Pod. The test
// recreates the Job controller at some points to make sure a new controller // recreates the Job controller at some points to make sure a new controller
// is able to pickup. // is able to pickup.
func TestNonParallelJob(t *testing.T) { func TestNonParallelJob(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
closeFn, restConfig, clientSet, ns := setup(t, "simple") closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet) ctx, cancel := startJobController(restConfig, clientSet)
@ -59,9 +65,12 @@ func TestNonParallelJob(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1, Active: 1,
}) }, wFinalizers)
// Restarting controller. // Restarting controller.
cancel() cancel()
@ -74,7 +83,7 @@ func TestNonParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1, Active: 1,
Failed: 1, Failed: 1,
}) }, wFinalizers)
// Restarting controller. // Restarting controller.
cancel() cancel()
@ -88,10 +97,17 @@ func TestNonParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 1, Failed: 1,
Succeeded: 1, Succeeded: 1,
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
} }
}
func TestParallelJob(t *testing.T) { func TestParallelJob(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
closeFn, restConfig, clientSet, ns := setup(t, "parallel") closeFn, restConfig, clientSet, ns := setup(t, "parallel")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet) ctx, cancel := startJobController(restConfig, clientSet)
@ -107,7 +123,7 @@ func TestParallelJob(t *testing.T) {
} }
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 5, Active: 5,
}) }, wFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
@ -115,7 +131,7 @@ func TestParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 5, Active: 5,
Failed: 2, Failed: 2,
}) }, wFinalizers)
// Once one Pod succeeds, no more Pods are created, even if some fail. // Once one Pod succeeds, no more Pods are created, even if some fail.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
@ -124,7 +140,7 @@ func TestParallelJob(t *testing.T) {
Failed: 2, Failed: 2,
Succeeded: 1, Succeeded: 1,
Active: 4, Active: 4,
}) }, wFinalizers)
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
} }
@ -132,7 +148,7 @@ func TestParallelJob(t *testing.T) {
Failed: 4, Failed: 4,
Succeeded: 1, Succeeded: 1,
Active: 2, Active: 2,
}) }, wFinalizers)
// No more Pods are created after remaining Pods succeed. // No more Pods are created after remaining Pods succeed.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
@ -141,10 +157,16 @@ func TestParallelJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 4, Failed: 4,
Succeeded: 3, Succeeded: 3,
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
} }
}
func TestParallelJobWithCompletions(t *testing.T) { func TestParallelJobWithCompletions(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
closeFn, restConfig, clientSet, ns := setup(t, "completions") closeFn, restConfig, clientSet, ns := setup(t, "completions")
defer closeFn() defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet) ctx, cancel := startJobController(restConfig, clientSet)
@ -159,9 +181,12 @@ func TestParallelJobWithCompletions(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4, Active: 4,
}) }, wFinalizers)
// Failed Pods are replaced. // Failed Pods are replaced.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodFailed, 2); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodFailed, err)
@ -169,7 +194,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 4, Active: 4,
Failed: 2, Failed: 2,
}) }, wFinalizers)
// Pods are created until the number of succeeded Pods equals completions. // Pods are created until the number of succeeded Pods equals completions.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodSucceeded, err)
@ -178,7 +203,7 @@ func TestParallelJobWithCompletions(t *testing.T) {
Failed: 2, Failed: 2,
Succeeded: 3, Succeeded: 3,
Active: 3, Active: 3,
}) }, wFinalizers)
// No more Pods are created after the Job completes. // No more Pods are created after the Job completes.
if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil { if err := setJobPodsPhase(ctx, clientSet, jobObj, v1.PodSucceeded, 3); err != nil {
t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err) t.Fatalf("Failed setting phase %s on Job Pods: %v", v1.PodSucceeded, err)
@ -187,10 +212,16 @@ func TestParallelJobWithCompletions(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Failed: 2, Failed: 2,
Succeeded: 6, Succeeded: 6,
}, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
}) })
} }
}
func TestIndexedJob(t *testing.T) { func TestIndexedJob(t *testing.T) {
for _, wFinalizers := range []bool{false, true} {
t.Run(fmt.Sprintf("finalizers=%t", wFinalizers), func(t *testing.T) {
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, wFinalizers)()
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.IndexedJob, true)()
closeFn, restConfig, clientSet, ns := setup(t, "indexed") closeFn, restConfig, clientSet, ns := setup(t, "indexed")
@ -211,9 +242,12 @@ func TestIndexedJob(t *testing.T) {
if err != nil { if err != nil {
t.Fatalf("Failed to create Job: %v", err) t.Fatalf("Failed to create Job: %v", err)
} }
if got := hasJobTrackingAnnotation(jobObj); got != wFinalizers {
t.Errorf("apiserver created job with tracking annotation: %t, want %t", got, wFinalizers)
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3, Active: 3,
}) }, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "") validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 1, 2), "")
// One Pod succeeds. // One Pod succeeds.
@ -223,7 +257,7 @@ func TestIndexedJob(t *testing.T) {
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 3, Active: 3,
Succeeded: 1, Succeeded: 1,
}) }, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
// Disable feature gate and restart controller. // Disable feature gate and restart controller.
@ -254,7 +288,7 @@ func TestIndexedJob(t *testing.T) {
Active: 3, Active: 3,
Failed: 1, Failed: 1,
Succeeded: 1, Succeeded: 1,
}) }, wFinalizers)
validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1") validateIndexedJobPods(ctx, t, clientSet, jobObj, sets.NewInt(0, 2, 3), "1")
// Remaining Pods succeed. // Remaining Pods succeed.
@ -265,9 +299,146 @@ func TestIndexedJob(t *testing.T) {
Active: 0, Active: 0,
Failed: 1, Failed: 1,
Succeeded: 4, Succeeded: 4,
}) }, false)
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3") validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
validateJobSucceeded(ctx, t, clientSet, jobObj) validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
})
}
}
// TestDisableJobTrackingWithFinalizers ensures that when the
// JobTrackingWithFinalizers feature is disabled, tracking finalizers are
// removed from all pods, but Job continues to be tracked.
// This test can be removed once the feature graduates to GA.
func TestDisableJobTrackingWithFinalizers(t *testing.T) {
// Step 1: job created while feature is enabled.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(2),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
}, true)
// Step 2: Disable tracking with finalizers.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
cancel()
// Fail a pod while Job controller is stopped.
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodFailed, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
// Restart controller.
ctx, cancel = startJobController(restConfig, clientSet)
// Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 2,
Failed: 1,
}, false)
jobObj, err = clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Obtaining updated Job object: %v", err)
}
if hasJobTrackingAnnotation(jobObj) {
t.Error("controller didn't remove the tracking annotation")
}
// Step 3: Reenable tracking with finalizers.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
cancel()
// Succeed a pod while Job controller is stopped.
if err := setJobPodsPhase(context.Background(), clientSet, jobObj, v1.PodSucceeded, 1); err != nil {
t.Fatalf("Failed setting phase %s on Job Pod: %v", v1.PodFailed, err)
}
// Restart controller.
ctx, cancel = startJobController(restConfig, clientSet)
// Ensure Job continues to be tracked and finalizers are removed.
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
Failed: 1,
Succeeded: 1,
}, false)
}
func TestOrphanPodsFinalizersCleared(t *testing.T) {
// Step 0: job created while feature is enabled.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, true)()
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobController(restConfig, clientSet)
defer func() {
cancel()
}()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
Parallelism: pointer.Int32Ptr(1),
},
})
if err != nil {
t.Fatalf("Failed to create Job: %v", err)
}
if !hasJobTrackingAnnotation(jobObj) {
t.Error("apiserver didn't add the tracking annotation")
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, podsByStatus{
Active: 1,
}, true)
// Step 2: Disable tracking with finalizers.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.JobTrackingWithFinalizers, false)()
cancel()
// Delete the Job while controller is stopped.
err = clientSet.BatchV1().Jobs(jobObj.Namespace).Delete(context.Background(), jobObj.Name, metav1.DeleteOptions{})
if err != nil {
t.Fatalf("Failed to delete job: %v", err)
}
// Restart controller.
ctx, cancel = startJobController(restConfig, clientSet)
if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (done bool, err error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Falied to list Job Pods: %v", err)
}
sawPods := false
for _, pod := range pods.Items {
if isPodOwnedByJob(&pod, jobObj) {
if hasJobTrackingFinalizer(&pod) {
return false, nil
}
sawPods = true
}
}
return sawPods, nil
}); err != nil {
t.Errorf("Waiting for finalizers to be removed: %v", err)
}
} }
func TestSuspendJob(t *testing.T) { func TestSuspendJob(t *testing.T) {
@ -336,7 +507,7 @@ func TestSuspendJob(t *testing.T) {
validate := func(s string, active int, status v1.ConditionStatus, reason string) { validate := func(s string, active int, status v1.ConditionStatus, reason string) {
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: active, Active: active,
}) }, false)
job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{}) job, err = clientSet.BatchV1().Jobs(ns.Name).Get(ctx, job.Name, metav1.GetOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to get Job after %s: %v", s, err) t.Fatalf("Failed to get Job after %s: %v", s, err)
@ -382,7 +553,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
} }
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: 0, Active: 0,
}) }, false)
// Disable feature gate and restart controller to test that pods get created. // Disable feature gate and restart controller to test that pods get created.
defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)() defer featuregatetesting.SetFeatureGateDuringTest(t, feature.DefaultFeatureGate, features.SuspendJob, false)()
@ -394,7 +565,7 @@ func TestSuspendJobControllerRestart(t *testing.T) {
} }
validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{ validateJobPodsStatus(ctx, t, clientSet, job, podsByStatus{
Active: 2, Active: 2,
}) }, false)
} }
type podsByStatus struct { type podsByStatus struct {
@ -403,10 +574,10 @@ type podsByStatus struct {
Succeeded int Succeeded int
} }
func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus) { func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job, desired podsByStatus, wFinalizer bool) {
t.Helper() t.Helper()
var actualCounts podsByStatus var actualCounts podsByStatus
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) updatedJob, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to get updated Job: %v", err) t.Fatalf("Failed to get updated Job: %v", err)
@ -419,23 +590,46 @@ func validateJobPodsStatus(ctx context.Context, t *testing.T, clientSet clientse
return cmp.Equal(actualCounts, desired), nil return cmp.Equal(actualCounts, desired), nil
}); err != nil { }); err != nil {
diff := cmp.Diff(desired, actualCounts) diff := cmp.Diff(desired, actualCounts)
t.Errorf("Waiting for Job Pods: %v\nPods (-want,+got):\n%s", err, diff) t.Errorf("Waiting for Job Status: %v\nPods (-want,+got):\n%s", err, diff)
} }
// Verify active Pods. No need for another wait.Poll. var active []*v1.Pod
if err := wait.PollImmediate(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{}) pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err) t.Fatalf("Failed to list Job Pods: %v", err)
} }
active := 0 active = nil
for _, pod := range pods.Items { for _, pod := range pods.Items {
if isPodOwnedByJob(&pod, jobObj) { phase := pod.Status.Phase
if pod.Status.Phase == v1.PodPending || pod.Status.Phase == v1.PodRunning { if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) {
active++ p := pod
active = append(active, &p)
}
}
return len(active) == desired.Active, nil
}); err != nil {
if len(active) != desired.Active {
t.Errorf("Found %d active Pods, want %d", len(active), desired.Active)
}
}
for _, p := range active {
if got := hasJobTrackingFinalizer(p); got != wFinalizer {
t.Errorf("Pod %s has tracking finalizer %t, want %t", p.Name, got, wFinalizer)
} }
} }
} }
if active != desired.Active {
t.Errorf("Found %d active Pods, want %d", active, desired.Active) func validateFinishedPodsNoFinalizer(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper()
pods, err := clientSet.CoreV1().Pods(jobObj.Namespace).List(ctx, metav1.ListOptions{})
if err != nil {
t.Fatalf("Failed to list Job Pods: %v", err)
}
for _, pod := range pods.Items {
phase := pod.Status.Phase
if isPodOwnedByJob(&pod, jobObj) && (phase == v1.PodPending || phase == v1.PodRunning) && hasJobTrackingFinalizer(&pod) {
t.Errorf("Finished pod %s still has a tracking finalizer", pod.Name)
}
} }
} }
@ -484,7 +678,7 @@ func waitForEvent(events watch.Interface, uid types.UID, reason string) error {
if reason == "" { if reason == "" {
return nil return nil
} }
return wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { return wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
for { for {
var ev watch.Event var ev watch.Event
select { select {
@ -515,7 +709,7 @@ func getJobConditionStatus(ctx context.Context, job *batchv1.Job, cType batchv1.
func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) { func validateJobSucceeded(ctx context.Context, t *testing.T, clientSet clientset.Interface, jobObj *batchv1.Job) {
t.Helper() t.Helper()
if err := wait.Poll(time.Second, wait.ForeverTestTimeout, func() (bool, error) { if err := wait.Poll(waitInterval, wait.ForeverTestTimeout, func() (bool, error) {
j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{}) j, err := clientSet.BatchV1().Jobs(jobObj.Namespace).Get(ctx, jobObj.Name, metav1.GetOptions{})
if err != nil { if err != nil {
t.Fatalf("Failed to obtain updated Job: %v", err) t.Fatalf("Failed to obtain updated Job: %v", err)
@ -632,3 +826,20 @@ func startJobController(restConfig *restclient.Config, clientSet clientset.Inter
go jc.Run(1, ctx.Done()) go jc.Run(1, ctx.Done())
return ctx, cancel return ctx, cancel
} }
func hasJobTrackingFinalizer(obj metav1.Object) bool {
for _, fin := range obj.GetFinalizers() {
if fin == batchv1.JobTrackingFinalizer {
return true
}
}
return false
}
func hasJobTrackingAnnotation(job *batchv1.Job) bool {
if job.Annotations == nil {
return false
}
_, ok := job.Annotations[batchv1.JobTrackingFinalizer]
return ok
}