From 70d3bb43e568af58c17d19c8319c558da9a33f47 Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Fri, 2 Jun 2023 18:42:19 +0200 Subject: [PATCH 1/2] Adjust the algorithm for computing the pod finish time Change-Id: Ic282a57169cab8dc498574f08b081914218a1039 --- pkg/controller/job/backoff_utils.go | 67 ++++++++++++------ pkg/controller/job/backoff_utils_test.go | 87 ++++++++++++++++++++++-- pkg/controller/job/job_controller.go | 2 +- 3 files changed, 126 insertions(+), 30 deletions(-) diff --git a/pkg/controller/job/backoff_utils.go b/pkg/controller/job/backoff_utils.go index 16c0f3aa3f2..6a7b55e64e1 100644 --- a/pkg/controller/job/backoff_utils.go +++ b/pkg/controller/job/backoff_utils.go @@ -24,6 +24,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/utils/clock" + "k8s.io/utils/pointer" + + apipod "k8s.io/kubernetes/pkg/api/v1/pod" ) type backoffRecord struct { @@ -86,8 +89,7 @@ var backoffRecordKeyFunc = func(obj interface{}) (string, error) { return "", fmt.Errorf("could not find key for obj %#v", obj) } -func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord { - now := clock.Now() +func (backoffRecordStore *backoffStore) newBackoffRecord(key string, newSucceededPods []*v1.Pod, newFailedPods []*v1.Pod) backoffRecord { var backoff *backoffRecord if b, exists, _ := backoffRecordStore.store.GetByKey(key); exists { @@ -105,8 +107,8 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, } } - sortByFinishedTime(newSucceededPods, now) - sortByFinishedTime(newFailedPods, now) + sortByFinishedTime(newSucceededPods) + sortByFinishedTime(newFailedPods) if len(newSucceededPods) == 0 { if len(newFailedPods) == 0 { @@ -114,7 +116,7 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, } backoff.failuresAfterLastSuccess = backoff.failuresAfterLastSuccess + int32(len(newFailedPods)) - lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1], now) + lastFailureTime := getFinishedTime(newFailedPods[len(newFailedPods)-1]) backoff.lastFailureTime = &lastFailureTime return *backoff @@ -128,9 +130,9 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, backoff.failuresAfterLastSuccess = 0 backoff.lastFailureTime = nil - lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1], now) + lastSuccessTime := getFinishedTime(newSucceededPods[len(newSucceededPods)-1]) for i := len(newFailedPods) - 1; i >= 0; i-- { - failedTime := getFinishedTime(newFailedPods[i], now) + failedTime := getFinishedTime(newFailedPods[i]) if !failedTime.After(lastSuccessTime) { break } @@ -146,39 +148,60 @@ func (backoffRecordStore *backoffStore) newBackoffRecord(clock clock.WithTicker, } -func sortByFinishedTime(pods []*v1.Pod, currentTime time.Time) { +func sortByFinishedTime(pods []*v1.Pod) { sort.Slice(pods, func(i, j int) bool { p1 := pods[i] p2 := pods[j] - p1FinishTime := getFinishedTime(p1, currentTime) - p2FinishTime := getFinishedTime(p2, currentTime) + p1FinishTime := getFinishedTime(p1) + p2FinishTime := getFinishedTime(p2) return p1FinishTime.Before(p2FinishTime) }) } -func getFinishedTime(p *v1.Pod, currentTime time.Time) time.Time { +func getFinishedTime(p *v1.Pod) time.Time { + finishTime := getFinishTimeFromContainers(p) + if finishTime == nil { + finishTime = getFinishTimeFromPodReadyFalseCondition(p) + } + if finishTime == nil { + finishTime = getFinishTimeFromDeletionTimestamp(p) + } + if finishTime != nil { + return *finishTime + } + return p.CreationTimestamp.Time +} + +func getFinishTimeFromContainers(p *v1.Pod) *time.Time { var finishTime *time.Time for _, containerState := range p.Status.ContainerStatuses { if containerState.State.Terminated == nil { - finishTime = nil - break + return nil } - - if finishTime == nil { + if containerState.State.Terminated.FinishedAt.Time.IsZero() { + return nil + } + if finishTime == nil || finishTime.Before(containerState.State.Terminated.FinishedAt.Time) { finishTime = &containerState.State.Terminated.FinishedAt.Time - } else { - if finishTime.Before(containerState.State.Terminated.FinishedAt.Time) { - finishTime = &containerState.State.Terminated.FinishedAt.Time - } } } + return finishTime +} - if finishTime == nil || finishTime.IsZero() { - return currentTime +func getFinishTimeFromPodReadyFalseCondition(p *v1.Pod) *time.Time { + if _, c := apipod.GetPodCondition(&p.Status, v1.PodReady); c != nil && c.Status == v1.ConditionFalse && !c.LastTransitionTime.Time.IsZero() { + return &c.LastTransitionTime.Time } + return nil +} - return *finishTime +func getFinishTimeFromDeletionTimestamp(p *v1.Pod) *time.Time { + if p.DeletionTimestamp != nil { + finishTime := p.DeletionTimestamp.Time.Add(-time.Duration(pointer.Int64Deref(p.DeletionGracePeriodSeconds, 0)) * time.Second) + return &finishTime + } + return nil } func (backoff backoffRecord) getRemainingTime(clock clock.WithTicker, defaultBackoff time.Duration, maxBackoff time.Duration) time.Duration { diff --git a/pkg/controller/job/backoff_utils_test.go b/pkg/controller/job/backoff_utils_test.go index 79e0f2a4d2c..00659c4bb4d 100644 --- a/pkg/controller/job/backoff_utils_test.go +++ b/pkg/controller/job/backoff_utils_test.go @@ -24,6 +24,7 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clocktesting "k8s.io/utils/clock/testing" + "k8s.io/utils/pointer" ) func TestNewBackoffRecord(t *testing.T) { @@ -189,9 +190,7 @@ func TestNewBackoffRecord(t *testing.T) { }) } - fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) - - backoffRecord := backoffRecordStore.newBackoffRecord(fakeClock, "key", newSucceededPods, newFailedPods) + backoffRecord := backoffRecordStore.newBackoffRecord("key", newSucceededPods, newFailedPods) if diff := cmp.Diff(tc.wantBackoffRecord, backoffRecord, cmp.AllowUnexported(backoffRecord)); diff != "" { t.Errorf("backoffRecord not matching; (-want,+got): %v", diff) } @@ -201,6 +200,7 @@ func TestNewBackoffRecord(t *testing.T) { func TestGetFinishedTime(t *testing.T) { defaultTestTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) + defaultTestTimeMinus30s := defaultTestTime.Add(-30 * time.Second) testCases := map[string]struct { pod v1.Pod wantFinishTime time.Time @@ -229,7 +229,7 @@ func TestGetFinishedTime(t *testing.T) { }, wantFinishTime: defaultTestTime, }, - "Pod with multiple containers; two containers in terminated state and one in running state": { + "Pod with multiple containers; two containers in terminated state and one in running state; fallback to deletionTimestamp": { pod: v1.Pod{ Status: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ @@ -250,10 +250,13 @@ func TestGetFinishedTime(t *testing.T) { }, }, }, + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: defaultTestTime}, + }, }, wantFinishTime: defaultTestTime, }, - "Pod with single container in running state": { + "fallback to deletionTimestamp": { pod: v1.Pod{ Status: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ @@ -264,10 +267,77 @@ func TestGetFinishedTime(t *testing.T) { }, }, }, + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: defaultTestTime}, + }, }, wantFinishTime: defaultTestTime, }, - "Pod with single container with zero finish time": { + "fallback to deletionTimestamp, decremented by grace period": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Running: &v1.ContainerStateRunning{}, + }, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: defaultTestTime}, + DeletionGracePeriodSeconds: pointer.Int64(30), + }, + }, + wantFinishTime: defaultTestTimeMinus30s, + }, + "fallback to PodReady.LastTransitionTime when status of the condition is False": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionFalse, + Reason: "PodFailed", + LastTransitionTime: metav1.Time{Time: defaultTestTime}, + }, + }, + }, + }, + wantFinishTime: defaultTestTime, + }, + "skip fallback to PodReady.LastTransitionTime when status of the condition is True": { + pod: v1.Pod{ + Status: v1.PodStatus{ + ContainerStatuses: []v1.ContainerStatus{ + { + State: v1.ContainerState{ + Terminated: &v1.ContainerStateTerminated{}, + }, + }, + }, + Conditions: []v1.PodCondition{ + { + Type: v1.PodReady, + Status: v1.ConditionTrue, + LastTransitionTime: metav1.Time{Time: defaultTestTimeMinus30s}, + }, + }, + }, + ObjectMeta: metav1.ObjectMeta{ + DeletionTimestamp: &metav1.Time{Time: defaultTestTime}, + }, + }, + wantFinishTime: defaultTestTime, + }, + "fallback to creationTimestamp": { pod: v1.Pod{ Status: v1.PodStatus{ ContainerStatuses: []v1.ContainerStatus{ @@ -278,6 +348,9 @@ func TestGetFinishedTime(t *testing.T) { }, }, }, + ObjectMeta: metav1.ObjectMeta{ + CreationTimestamp: metav1.Time{Time: defaultTestTime}, + }, }, wantFinishTime: defaultTestTime, }, @@ -285,7 +358,7 @@ func TestGetFinishedTime(t *testing.T) { for name, tc := range testCases { t.Run(name, func(t *testing.T) { - f := getFinishedTime(&tc.pod, defaultTestTime) + f := getFinishedTime(&tc.pod) if !f.Equal(tc.wantFinishTime) { t.Errorf("Expected value of finishedTime %v; got %v", tc.wantFinishTime, f) } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index d4b32faa634..da5e5e0f25c 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -762,7 +762,7 @@ func (jm *Controller) syncJob(ctx context.Context, key string) (rErr error) { job.Status.StartTime = &now } - newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(jm.clock, key, newSucceededPods, newFailedPods) + newBackoffInfo := jm.backoffRecordStore.newBackoffRecord(key, newSucceededPods, newFailedPods) var manageJobErr error var finishedCondition *batch.JobCondition From 71ab7dc791f816ccd06d10784e6160062a080f8f Mon Sep 17 00:00:00 2001 From: Michal Wozniak Date: Mon, 5 Jun 2023 10:36:11 +0200 Subject: [PATCH 2/2] Remarks --- pkg/controller/job/backoff_utils.go | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/pkg/controller/job/backoff_utils.go b/pkg/controller/job/backoff_utils.go index 6a7b55e64e1..4a3a5e7fba8 100644 --- a/pkg/controller/job/backoff_utils.go +++ b/pkg/controller/job/backoff_utils.go @@ -23,10 +23,9 @@ import ( v1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + apipod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/utils/clock" "k8s.io/utils/pointer" - - apipod "k8s.io/kubernetes/pkg/api/v1/pod" ) type backoffRecord struct { @@ -159,17 +158,26 @@ func sortByFinishedTime(pods []*v1.Pod) { }) } +// Returns the pod finish time using the following lookups: +// 1. if all containers finished, use the latest time +// 2. if the pod has Ready=False condition, use the last transition time +// 3. if the pod has been deleted, use the `deletionTimestamp - grace_period` to estimate the moment of deletion +// 4. fallback to pod's creation time +// +// Pods owned by Kubelet are marked with Ready=False condition when +// transitioning to terminal phase, thus being handled by (1.) or (2.). +// Orphaned pods are deleted by PodGC, thus being handled by (3.). func getFinishedTime(p *v1.Pod) time.Time { - finishTime := getFinishTimeFromContainers(p) - if finishTime == nil { - finishTime = getFinishTimeFromPodReadyFalseCondition(p) - } - if finishTime == nil { - finishTime = getFinishTimeFromDeletionTimestamp(p) - } - if finishTime != nil { + if finishTime := getFinishTimeFromContainers(p); finishTime != nil { return *finishTime } + if finishTime := getFinishTimeFromPodReadyFalseCondition(p); finishTime != nil { + return *finishTime + } + if finishTime := getFinishTimeFromDeletionTimestamp(p); finishTime != nil { + return *finishTime + } + // This should not happen in clusters with Kubelet and PodGC running. return p.CreationTimestamp.Time }