Add metric for terminated pods with tracking finalizer

Change-Id: I26f3169588c30ed82250cb7baff8e277f8d13bb7
This commit is contained in:
Aldo Culquicondor 2022-10-19 09:49:38 -04:00
parent 7423cc931f
commit 12d308f5c4
5 changed files with 197 additions and 19 deletions

View File

@ -246,6 +246,7 @@ func (jm *Controller) resolveControllerRef(namespace string, controllerRef *meta
// When a pod is created, enqueue the controller that manages it and update its expectations.
func (jm *Controller) addPod(obj interface{}) {
pod := obj.(*v1.Pod)
recordFinishedPodWithTrackingFinalizer(nil, pod)
if pod.DeletionTimestamp != nil {
// on a restart of the controller, it's possible a new pod shows up in a state that
// is already pending deletion. Prevent the pod from being a creation observation.
@ -288,6 +289,7 @@ func (jm *Controller) addPod(obj interface{}) {
func (jm *Controller) updatePod(old, cur interface{}) {
curPod := cur.(*v1.Pod)
oldPod := old.(*v1.Pod)
recordFinishedPodWithTrackingFinalizer(oldPod, curPod)
if curPod.ResourceVersion == oldPod.ResourceVersion {
// Periodic resync will send update events for all known pods.
// Two different versions of the same pod will always have different RVs.
@ -362,6 +364,9 @@ func (jm *Controller) updatePod(old, cur interface{}) {
// obj could be an *v1.Pod, or a DeleteFinalStateUnknown marker item.
func (jm *Controller) deletePod(obj interface{}, final bool) {
pod, ok := obj.(*v1.Pod)
if final {
recordFinishedPodWithTrackingFinalizer(pod, nil)
}
// When a delete is dropped, the relist will notice a pod in the store not
// in the list, leading to the insertion of a tombstone object which contains
@ -1673,15 +1678,6 @@ func removeTrackingAnnotationPatch(job *batch.Job) []byte {
return patchBytes
}
func hasJobTrackingFinalizer(pod *v1.Pod) bool {
for _, fin := range pod.Finalizers {
if fin == batch.JobTrackingFinalizer {
return true
}
}
return false
}
type uncountedTerminatedPods struct {
succeeded sets.String
failed sets.String

View File

@ -83,6 +83,18 @@ var (
Help: "The number of finished Pods that are fully tracked",
},
[]string{"completion_mode", "result"})
// TerminatedPodsWithTrackingFinalizer records the addition and removal of
// terminated pods that have the finalizer batch.kubernetes.io/job-tracking,
// regardless of whether they are owned by a Job.
TerminatedPodsTrackingFinalizerTotal = metrics.NewCounterVec(
&metrics.CounterOpts{
Subsystem: JobControllerSubsystem,
Name: "terminated_pods_tracking_finalizer_total",
Help: `The number of terminated pods (phase=Failed|Succeeded)
that have the finalizer batch.kubernetes.io/job-tracking
The event label can be "add" or "delete".`,
}, []string{"event"})
)
const (
@ -109,6 +121,11 @@ const (
Succeeded = "succeeded"
Failed = "failed"
// Possible values for "event" label in the terminated_pods_tracking_finalizer
// metric.
Add = "add"
Delete = "delete"
)
var registerMetrics sync.Once
@ -120,5 +137,6 @@ func Register() {
legacyregistry.MustRegister(JobSyncNum)
legacyregistry.MustRegister(JobFinishedNum)
legacyregistry.MustRegister(JobPodsFinished)
legacyregistry.MustRegister(TerminatedPodsTrackingFinalizerTotal)
})
}

View File

@ -20,9 +20,12 @@ import (
"fmt"
"sync"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller/job/metrics"
)
// uidSetKeyFunc to parse out the key from a uidSet.
@ -118,3 +121,32 @@ func (u *uidTrackingExpectations) deleteExpectations(jobKey string) {
func newUIDTrackingExpectations() *uidTrackingExpectations {
return &uidTrackingExpectations{store: cache.NewStore(uidSetKeyFunc)}
}
func hasJobTrackingFinalizer(pod *v1.Pod) bool {
for _, fin := range pod.Finalizers {
if fin == batch.JobTrackingFinalizer {
return true
}
}
return false
}
func recordFinishedPodWithTrackingFinalizer(oldPod, newPod *v1.Pod) {
was := isFinishedPodWithTrackingFinalizer(oldPod)
is := isFinishedPodWithTrackingFinalizer(newPod)
if was == is {
return
}
var event = metrics.Delete
if is {
event = metrics.Add
}
metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event).Inc()
}
func isFinishedPodWithTrackingFinalizer(pod *v1.Pod) bool {
if pod == nil {
return false
}
return (pod.Status.Phase == v1.PodFailed || pod.Status.Phase == v1.PodSucceeded) && hasJobTrackingFinalizer(pod)
}

View File

@ -17,10 +17,16 @@ limitations under the License.
package job
import (
"fmt"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
batch "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/component-base/metrics/testutil"
"k8s.io/kubernetes/pkg/controller/job/metrics"
)
func TestUIDTrackingExpectations(t *testing.T) {
@ -116,3 +122,108 @@ func TestUIDTrackingExpectations(t *testing.T) {
}
}
}
func TestRecordFinishedPodWithTrackingFinalizer(t *testing.T) {
metrics.Register()
cases := map[string]struct {
oldPod *v1.Pod
newPod *v1.Pod
wantAdd int
wantDelete int
}{
"new non-finished Pod with finalizer": {
newPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{batch.JobTrackingFinalizer},
},
Status: v1.PodStatus{
Phase: v1.PodPending,
},
},
},
"pod with finalizer fails": {
oldPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{batch.JobTrackingFinalizer},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
},
newPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{batch.JobTrackingFinalizer},
},
Status: v1.PodStatus{
Phase: v1.PodFailed,
},
},
wantAdd: 1,
},
"pod with finalizer succeeds": {
oldPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{batch.JobTrackingFinalizer},
},
Status: v1.PodStatus{
Phase: v1.PodRunning,
},
},
newPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{batch.JobTrackingFinalizer},
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
wantAdd: 1,
},
"succeeded pod loses finalizer": {
oldPod: &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Finalizers: []string{batch.JobTrackingFinalizer},
},
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
newPod: &v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
wantDelete: 1,
},
"pod without finalizer removed": {
oldPod: &v1.Pod{
Status: v1.PodStatus{
Phase: v1.PodSucceeded,
},
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
recordFinishedPodWithTrackingFinalizer(tc.oldPod, tc.newPod)
if err := validateTerminatedPodsTrackingFinalizerTotal(metrics.Add, tc.wantAdd); err != nil {
t.Errorf("Failed validating terminated_pods_tracking_finalizer_total(add): %v", err)
}
if err := validateTerminatedPodsTrackingFinalizerTotal(metrics.Delete, tc.wantDelete); err != nil {
t.Errorf("Failed validating terminated_pods_tracking_finalizer_total(delete): %v", err)
}
})
}
}
func validateTerminatedPodsTrackingFinalizerTotal(event string, want int) error {
got, err := testutil.GetCounterMetricValue(metrics.TerminatedPodsTrackingFinalizerTotal.WithLabelValues(event))
if err != nil {
return err
}
if int(got) != want {
return fmt.Errorf("got value %d, want %d", int(got), want)
}
return nil
}

View File

@ -78,9 +78,7 @@ func TestMetrics(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "simple")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
defer cancel()
testCases := map[string]struct {
job *batchv1.Job
@ -144,13 +142,14 @@ func TestMetrics(t *testing.T) {
validateJobSucceeded(ctx, t, clientSet, jobObj)
// verify metric values after the job is finished
validateMetricValue(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
validateMetricValue(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
validateCounterMetric(t, metrics.JobFinishedNum, tc.wantJobFinishedNumMetric)
validateCounterMetric(t, metrics.JobPodsFinished, tc.wantJobPodsFinishedMetric)
validateTerminatedPodsTrackingFinalizerMetric(t, int(*jobObj.Spec.Parallelism))
})
}
}
func validateMetricValue(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
func validateCounterMetric(t *testing.T, counterVec *basemetrics.CounterVec, wantMetric metricLabelsWithValue) {
t.Helper()
var cmpErr error
err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
@ -166,13 +165,24 @@ func validateMetricValue(t *testing.T, counterVec *basemetrics.CounterVec, wantM
return true, nil
})
if err != nil {
t.Errorf("Failed waiting for expected metric delta: %q", err)
t.Errorf("Failed waiting for expected metric: %q", err)
}
if cmpErr != nil {
t.Error(cmpErr)
}
}
func validateTerminatedPodsTrackingFinalizerMetric(t *testing.T, want int) {
validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
Value: want,
Labels: []string{metrics.Add},
})
validateCounterMetric(t, metrics.TerminatedPodsTrackingFinalizerTotal, metricLabelsWithValue{
Value: want,
Labels: []string{metrics.Delete},
})
}
// TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart verifies that the job is properly marked as Failed
// in a scenario when the job controller crashes between removing pod finalizers and marking the job as Failed (based on
// the pod failure policy). After the finalizer for the failed pod is removed we remove the failed pod. This step is
@ -238,6 +248,7 @@ func TestJobPodFailurePolicyWithFailedPodDeletedDuringControllerRestart(t *testi
defer func() {
cancel()
}()
resetMetrics()
restConfig.QPS = 200
restConfig.Burst = 200
@ -556,6 +567,7 @@ func TestParallelJob(t *testing.T) {
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer cancel()
resetMetrics()
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
Spec: batchv1.JobSpec{
@ -631,6 +643,9 @@ func TestParallelJob(t *testing.T) {
}
validateJobPodsStatus(ctx, t, clientSet, jobObj, want, false)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
if tc.trackWithFinalizers {
validateTerminatedPodsTrackingFinalizerMetric(t, 7)
}
})
}
}
@ -803,9 +818,8 @@ func TestIndexedJob(t *testing.T) {
closeFn, restConfig, clientSet, ns := setup(t, "indexed")
defer closeFn()
ctx, cancel := startJobControllerAndWaitForCaches(restConfig)
defer func() {
cancel()
}()
defer cancel()
resetMetrics()
mode := batchv1.IndexedCompletion
jobObj, err := createJobWithDefaults(ctx, clientSet, ns.Name, &batchv1.Job{
@ -863,6 +877,9 @@ func TestIndexedJob(t *testing.T) {
validateIndexedJobPods(ctx, t, clientSet, jobObj, nil, "0-3")
validateJobSucceeded(ctx, t, clientSet, jobObj)
validateFinishedPodsNoFinalizer(ctx, t, clientSet, jobObj)
if wFinalizers {
validateTerminatedPodsTrackingFinalizerMetric(t, 5)
}
})
}
}
@ -957,6 +974,7 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
restConfig.QPS = 1
restConfig.Burst = 1
jc, ctx, cancel := createJobControllerWithSharedInformers(restConfig, informerSet)
resetMetrics()
defer cancel()
restConfig.QPS = 200
restConfig.Burst = 200
@ -989,6 +1007,8 @@ func TestOrphanPodsFinalizersClearedWithGC(t *testing.T) {
t.Fatalf("Failed to delete job: %v", err)
}
validateNoOrphanPodsWithFinalizers(ctx, t, clientSet, jobObj)
// Pods never finished, so they are not counted in the metric.
validateTerminatedPodsTrackingFinalizerMetric(t, 0)
})
}
}
@ -1676,6 +1696,7 @@ func startJobControllerAndWaitForCaches(restConfig *restclient.Config) (context.
}
func resetMetrics() {
metrics.TerminatedPodsTrackingFinalizerTotal.Reset()
metrics.JobFinishedNum.Reset()
metrics.JobPodsFinished.Reset()
}