From 12d308f5c427c53f951c5a4f500fffddb57d174e Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor Date: Wed, 19 Oct 2022 09:49:38 -0400 Subject: [PATCH] Add metric for terminated pods with tracking finalizer Change-Id: I26f3169588c30ed82250cb7baff8e277f8d13bb7 --- pkg/controller/job/job_controller.go | 14 +-- pkg/controller/job/metrics/metrics.go | 18 ++++ pkg/controller/job/tracking_utils.go | 32 +++++++ pkg/controller/job/tracking_utils_test.go | 111 ++++++++++++++++++++++ test/integration/job/job_test.go | 41 ++++++-- 5 files changed, 197 insertions(+), 19 deletions(-) diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 70609da2953..42c5aa75a47 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -246,6 +246,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta // When a pod is created, enqueue the controller that manages it and update its expectations. func (jm *Controller) addPod(obj interface{}) { pod := obj.(*v1.Pod) + recordFinishedPodWithTrackingFinalizer(nil, pod) if pod.DeletionTimestamp != nil { // on a restart of the controller, it's possible a new pod shows up in a state that // is already pending deletion. Prevent the pod from being a creation observation. @@ -288,6 +289,7 @@ func (jm *Controller) addPod(obj interface{}) { func (jm *Controller) updatePod(old, cur interface{}) { curPod := cur.(*v1.Pod) oldPod := old.(*v1.Pod) + recordFinishedPodWithTrackingFinalizer(oldPod, curPod) if curPod.ResourceVersion == oldPod.ResourceVersion { // Periodic resync will send update events for all known pods. // Two different versions of the same pod will always have different RVs. @@ -362,6 +364,9 @@ func (jm *Controller) updatePod(old, cur interface{}) { // obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item. func (jm *Controller) deletePod(obj interface{}, final bool) { pod, ok := obj.(*v1.Pod) + if final { + recordFinishedPodWithTrackingFinalizer(pod, nil) + } // When a delete is dropped, the relist will notice a pod in the store not // in the list, leading to the insertion of a tombstone object which contains @@ -1673,15 +1678,6 @@ func removeTrackingAnnotationPatch(job *batch.Job) []byte { return patchBytes } -func hasJobTrackingFinalizer(pod *v1.Pod) bool { - for _, fin := range pod.Finalizers { - if fin == batch.JobTrackingFinalizer { - return true - } - } - return false -} - type uncountedTerminatedPods struct { succeeded sets.String failed sets.String diff --git a/pkg/controller/job/metrics/metrics.go b/pkg/controller/job/metrics/metrics.go index b9271426661..0c0fb0cbd74 100644 --- a/pkg/controller/job/metrics/metrics.go +++ b/pkg/controller/job/metrics/metrics.go @@ -83,6 +83,18 @@ var ( Help: "The number of finished Pods that are fully tracked", }, []string{"completion_mode", "result"}) + + // TerminatedPodsWithTrackingFinalizer records the addition and removal of + // terminated pods that have the finalizer batch.kubernetes.io/job-tracking, + // regardless of whether they are owned by a Job. + TerminatedPodsTrackingFinalizerTotal = metrics.NewCounterVec( + &metrics.CounterOpts{ + Subsystem: JobControllerSubsystem, + Name: "terminated_pods_tracking_finalizer_total", + Help: `The number of terminated pods (phase=Failed|Succeeded) +that have the finalizer batch.kubernetes.io/job-tracking +The event label can be "add" or "delete".`, + }, []string{"event"}) ) const ( @@ -109,6 +121,11 @@ const ( Succeeded = "succeeded" Failed = "failed" + + // Possible values for "event" label in the terminated_pods_tracking_finalizer + // metric. + Add = "add" + Delete = "delete" ) var registerMetrics sync.Once @@ -120,5 +137,6 @@ func Register() { legacyregistry.MustRegister(JobSyncNum) legacyregistry.MustRegister(JobFinishedNum) legacyregistry.MustRegister(JobPodsFinished) + legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal) }) } diff --git a/pkg/controller/job/tracking_utils.go b/pkg/controller/job/tracking_utils.go index e297434354d..b6be2df7b6a 100644 --- a/pkg/controller/job/tracking_utils.go +++ b/pkg/controller/job/tracking_utils.go @@ -20,9 +20,12 @@ import ( "fmt" "sync" + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" + "k8s.io/kubernetes/pkg/controller/job/metrics" ) // uidSetKeyFunc to parse out the key from a uidSet. @@ -118,3 +121,32 @@ func (u *uidTrackingExpectations) deleteExpectations(jobKey string) { func newUIDTrackingExpectations() *uidTrackingExpectations { return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)} } + +func hasJobTrackingFinalizer(pod *v1.Pod) bool { + for _, fin := range pod.Finalizers { + if fin == batch.JobTrackingFinalizer { + return true + } + } + return false +} + +func recordFinishedPodWithTrackingFinalizer(oldPod, newPod *v1.Pod) { + was := isFinishedPodWithTrackingFinalizer(oldPod) + is := isFinishedPodWithTrackingFinalizer(newPod) + if was == is { + return + } + var event = metrics.Delete + if is { + event = metrics.Add + } + metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event).Inc() +} + +func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool { + if pod == nil { + return false + } + return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod) +} diff --git a/pkg/controller/job/tracking_utils_test.go b/pkg/controller/job/tracking_utils_test.go index ed2a17327a0..f82f2663a0e 100644 --- a/pkg/controller/job/tracking_utils_test.go +++ b/pkg/controller/job/tracking_utils_test.go @@ -17,10 +17,16 @@ limitations under the License. package job import ( + "fmt" "sync" "testing" "github.com/google/go-cmp/cmp" + batch "k8s.io/api/batch/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/component-base/metrics/testutil" + "k8s.io/kubernetes/pkg/controller/job/metrics" ) func TestUIDTrackingExpectations(t *testing.T) { @@ -116,3 +122,108 @@ func TestUIDTrackingExpectations(t *testing.T) { } } } + +func TestRecordFinishedPodWithTrackingFinalizer(t *testing.T) { + metrics.Register() + cases := map[string]struct { + oldPod *v1.Pod + newPod *v1.Pod + wantAdd int + wantDelete int + }{ + "new non-finished Pod with finalizer": { + newPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{batch.JobTrackingFinalizer}, + }, + Status: v1.PodStatus{ + Phase: v1.PodPending, + }, + }, + }, + "pod with finalizer fails": { + oldPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{batch.JobTrackingFinalizer}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + newPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{batch.JobTrackingFinalizer}, + }, + Status: v1.PodStatus{ + Phase: v1.PodFailed, + }, + }, + wantAdd: 1, + }, + "pod with finalizer succeeds": { + oldPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{batch.JobTrackingFinalizer}, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + }, + }, + newPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{batch.JobTrackingFinalizer}, + }, + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + wantAdd: 1, + }, + "succeeded pod loses finalizer": { + oldPod: &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Finalizers: []string{batch.JobTrackingFinalizer}, + }, + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + newPod: &v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + wantDelete: 1, + }, + "pod without finalizer removed": { + oldPod: &v1.Pod{ + Status: v1.PodStatus{ + Phase: v1.PodSucceeded, + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + metrics.TerminatedPodsTrackingFinalizerTotal.Reset() + recordFinishedPodWithTrackingFinalizer(tc.oldPod, tc.newPod) + if err := validateTerminatedPodsTrackingFinalizerTotal(metrics.Add, tc.wantAdd); err != nil { + t.Errorf("Failed validating terminated_pods_tracking_finalizer_total(add): %v", err) + } + if err := validateTerminatedPodsTrackingFinalizerTotal(metrics.Delete, tc.wantDelete); err != nil { + t.Errorf("Failed validating terminated_pods_tracking_finalizer_total(delete): %v", err) + } + }) + } +} + +func validateTerminatedPodsTrackingFinalizerTotal(event string, want int) error { + got, err := testutil.GetCounterMetricValue(metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event)) + if err != nil { + return err + } + if int(got) != want { + return fmt.Errorf("got value %d, want %d", int(got), want) + } + return nil +} diff --git a/test/integration/job/job_test.go b/test/integration/job/job_test.go index 9c380102e94..f3ba4314b7a 100644 --- a/test/integration/job/job_test.go +++ b/test/integration/job/job_test.go @@ -78,9 +78,7 @@ func TestMetrics(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "simple") defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(restConfig) - defer func() { - cancel() - }() + defer cancel() testCases := map[string]struct { job *batchv1.Job @@ -144,13 +142,14 @@ func TestMetrics(t *testing.T) { validateJobSucceeded(ctx, t, clientSet, jobObj) // verify metric values after the job is finished - validateMetricValue(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) - validateMetricValue(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric) + validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric) + validateCounterMetric(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric) + validateTerminatedPodsTrackingFinalizerMetric(t, int(*jobObj.Spec.Parallelism)) }) } } -func validateMetricValue(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) { +func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) { t.Helper() var cmpErr error err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { @@ -166,13 +165,24 @@ func validateMetricValue(t *testing.T, counterVec *basemetrics.CounterVec, wantM return true, nil }) if err != nil { - t.Errorf("Failed waiting for expected metric delta: %q", err) + t.Errorf("Failed waiting for expected metric: %q", err) } if cmpErr != nil { t.Error(cmpErr) } } +func validateTerminatedPodsTrackingFinalizerMetric(t *testing.T, want int) { + validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{ + Value: want, + Labels: []string{metrics.Add}, + }) + validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{ + Value: want, + Labels: []string{metrics.Delete}, + }) +} + // TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed // in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on // the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is @@ -238,6 +248,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi defer func() { cancel() }() + resetMetrics() restConfig.QPS = 200 restConfig.Burst = 200 @@ -556,6 +567,7 @@ func TestParallelJob(t *testing.T) { defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(restConfig) defer cancel() + resetMetrics() jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ Spec: batchv1.JobSpec{ @@ -631,6 +643,9 @@ func TestParallelJob(t *testing.T) { } validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) + if tc.trackWithFinalizers { + validateTerminatedPodsTrackingFinalizerMetric(t, 7) + } }) } } @@ -803,9 +818,8 @@ func TestIndexedJob(t *testing.T) { closeFn, restConfig, clientSet, ns := setup(t, "indexed") defer closeFn() ctx, cancel := startJobControllerAndWaitForCaches(restConfig) - defer func() { - cancel() - }() + defer cancel() + resetMetrics() mode := batchv1.IndexedCompletion jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{ @@ -863,6 +877,9 @@ func TestIndexedJob(t *testing.T) { validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3") validateJobSucceeded(ctx, t, clientSet, jobObj) validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj) + if wFinalizers { + validateTerminatedPodsTrackingFinalizerMetric(t, 5) + } }) } } @@ -957,6 +974,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { restConfig.QPS = 1 restConfig.Burst = 1 jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet) + resetMetrics() defer cancel() restConfig.QPS = 200 restConfig.Burst = 200 @@ -989,6 +1007,8 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) { t.Fatalf("Failed to delete job: %v", err) } validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj) + // Pods never finished, so they are not counted in the metric. + validateTerminatedPodsTrackingFinalizerMetric(t, 0) }) } } @@ -1676,6 +1696,7 @@ func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context. } func resetMetrics() { + metrics.TerminatedPodsTrackingFinalizerTotal.Reset() metrics.JobFinishedNum.Reset() metrics.JobPodsFinished.Reset() }