Improve performance of Job controller delete event handler

This commit is contained in:
Harish Kuna 2024-09-15 18:11:39 +00:00
parent af2bf2d8f1
commit 77a3de3cfb
2 changed files with 114 additions and 24 deletions

View File

@ -113,7 +113,7 @@ type Controller struct {
queue workqueue.TypedRateLimitingInterface[string]
// Orphan deleted pods that still have a Job tracking finalizer to be removed
orphanQueue workqueue.TypedRateLimitingInterface[string]
orphanQueue workqueue.TypedRateLimitingInterface[orphanPodKey]
broadcaster record.EventBroadcaster
recorder record.EventRecorder
@ -143,6 +143,23 @@ type syncJobCtx struct {
ready int32
}
type orphanPodKeyKind int
const (
// "key"
OrphanPodKeyKindName orphanPodKeyKind = iota
// "selector"
OrphanPodKeyKindSelector
)
type orphanPodKey struct {
// Either "name" or "selector"
kind orphanPodKeyKind
namespace string
// Either "pod name" or "pod selector"
value string
}
// NewController creates a new Job controller that keeps the relevant pods
// in sync with their corresponding Job objects.
func NewController(ctx context.Context, podInformer coreinformers.PodInformer, jobInformer batchinformers.JobInformer, kubeClient clientset.Interface) (*Controller, error) {
@ -162,7 +179,7 @@ func newControllerWithClock(ctx context.Context, podInformer coreinformers.PodIn
expectations: controller.NewControllerExpectations(),
finalizerExpectations: newUIDTrackingExpectations(),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job", Clock: clock}),
orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[string](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[string]{Name: "job_orphan_pod", Clock: clock}),
orphanQueue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedItemExponentialFailureRateLimiter[orphanPodKey](DefaultJobApiBackOff, MaxJobApiBackOff), workqueue.TypedRateLimitingQueueConfig[orphanPodKey]{Name: "job_orphan_pod", Clock: clock}),
broadcaster: eventBroadcaster,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "job-controller"}),
clock: clock,
@ -513,7 +530,17 @@ func (jm *Controller) deleteJob(logger klog.Logger, obj interface{}) {
return
}
}
jm.cleanupPodFinalizers(jobObj)
selector, err := metav1.LabelSelectorAsSelector(jobObj.Spec.Selector)
if err != nil {
utilruntime.HandleError(fmt.Errorf("job %s/%s has invalid label selector: %w", jobObj.Namespace, jobObj.Name, err))
return
}
orphanPodKey := orphanPodKey{
kind: OrphanPodKeyKindSelector,
namespace: jobObj.Namespace,
value: selector.String(),
}
jm.orphanQueue.Add(orphanPodKey)
}
// enqueueSyncJobImmediately tells the Job controller to invoke syncJob
@ -563,12 +590,12 @@ func (jm *Controller) enqueueSyncJobInternal(logger klog.Logger, obj interface{}
}
func (jm *Controller) enqueueOrphanPod(obj *v1.Pod) {
key, err := controller.KeyFunc(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
return
orphanPodKey := orphanPodKey{
kind: OrphanPodKeyKindName,
namespace: obj.Namespace,
value: obj.Name,
}
jm.orphanQueue.Add(key)
jm.orphanQueue.Add(orphanPodKey)
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -620,37 +647,70 @@ func (jm *Controller) processNextOrphanPod(ctx context.Context) bool {
}
// syncOrphanPod removes the tracking finalizer from an orphan pod if found.
func (jm *Controller) syncOrphanPod(ctx context.Context, key string) error {
func (jm *Controller) syncOrphanPod(ctx context.Context, key orphanPodKey) error {
startTime := jm.clock.Now()
logger := klog.FromContext(ctx)
defer func() {
logger.V(4).Info("Finished syncing orphan pod", "pod", key, "elapsed", jm.clock.Since(startTime))
}()
ns, name, err := cache.SplitMetaNamespaceKey(key)
switch key.kind {
case OrphanPodKeyKindName:
pod, err := jm.podStore.Pods(key.namespace).Get(key.value)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(4).Info("Orphan pod has been deleted", "pod", klog.KRef(key.namespace, key.value))
return nil
}
return err
}
return jm.handleSingleOrphanPod(ctx, pod)
case OrphanPodKeyKindSelector:
logger.V(8).Info("syncing all pods matching the label selector", "namespace", key.namespace, "labelSelector", key.value)
return jm.syncOrphanPodsBySelector(ctx, key.namespace, key.value)
default:
return fmt.Errorf("unknown key type: %d", key.kind)
}
}
// syncOrphanPodsBySelector fetches and processes all pods matching the given label selector.
func (jm *Controller) syncOrphanPodsBySelector(ctx context.Context, namespace string, labelSelector string) error {
logger := klog.FromContext(ctx)
selector, err := labels.Parse(labelSelector)
if err != nil {
return err
return fmt.Errorf("invalid label selector: %w", err)
}
sharedPod, err := jm.podStore.Pods(ns).Get(name)
// Fetch all pods that match the label selector.
// relatively expensive operation but it is called only from the orphan reconciler
pods, err := jm.podStore.Pods(namespace).List(selector)
if err != nil {
if apierrors.IsNotFound(err) {
logger.V(4).Info("Orphan pod has been deleted", "pod", key)
return nil
}
return err
}
for _, pod := range pods {
if err := jm.handleSingleOrphanPod(ctx, pod); err != nil {
logger.Error(err, "syncing orphan pod failed", "pod", klog.KObj(pod))
}
}
return nil
}
// handleSingleOrphanPod processes a single orphan pod.
func (jm *Controller) handleSingleOrphanPod(ctx context.Context, sharedPod *v1.Pod) error {
logger := klog.FromContext(ctx)
ns := sharedPod.Namespace
name := sharedPod.Name
// Make sure the pod is still orphaned.
if controllerRef := metav1.GetControllerOf(sharedPod); controllerRef != nil {
if controllerRef.Kind != controllerKind.Kind || controllerRef.APIVersion != batch.SchemeGroupVersion.String() {
// The pod is controlled by an owner that is not a batch/v1 Job. Do not remove finalizer.
return nil
}
job := jm.resolveControllerRef(sharedPod.Namespace, controllerRef)
job := jm.resolveControllerRef(ns, controllerRef)
if job != nil {
// Skip cleanup of finalizers for pods owned by a job managed by an external controller
if controllerName := managedByExternalController(job); controllerName != nil {
logger.V(2).Info("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller", "key", key, "podUID", sharedPod.UID, "jobUID", job.UID, "controllerName", controllerName)
logger.V(2).Info("Skip cleanup of the job finalizer for a pod owned by a job that is managed by an external controller", "namespace", ns, "name", name, "podUID", sharedPod.UID, "jobUID", job.UID, "controllerName", controllerName)
return nil
}
}

View File

@ -6641,6 +6641,7 @@ func TestSyncOrphanPod(t *testing.T) {
job *batch.Job
inCache bool
wantFinalizerRemoved bool
podSelector *metav1.LabelSelector
}{
"controlled_by_existing_running_job": {
owner: &metav1.OwnerReference{
@ -6743,6 +6744,14 @@ func TestSyncOrphanPod(t *testing.T) {
},
wantFinalizerRemoved: true,
},
"orphan_pods_by_label_selector": {
podSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "test",
},
},
wantFinalizerRemoved: true,
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
@ -6756,19 +6765,35 @@ func TestSyncOrphanPod(t *testing.T) {
}
})
}
podBuilder := buildPod().name(name).deletionTimestamp().trackingFinalizer()
if tc.owner != nil {
podBuilder = podBuilder.owner(*tc.owner)
}
orphanPod := podBuilder.Pod
orphanPod, err := clientset.CoreV1().Pods("default").Create(ctx, orphanPod, metav1.CreateOptions{})
orphanKey := orphanPodKey{
kind: OrphanPodKeyKindName,
namespace: podBuilder.Pod.Namespace,
value: podBuilder.Pod.Name,
}
if tc.podSelector != nil {
podBuilder = podBuilder.labels(tc.podSelector.MatchLabels)
selector, err := metav1.LabelSelectorAsSelector(tc.podSelector)
if err != nil {
t.Fatalf("Error parsing pod label selector: %v", err)
}
orphanKey = orphanPodKey{
kind: OrphanPodKeyKindSelector,
namespace: podBuilder.Pod.Namespace,
value: selector.String(),
}
}
orphanPod, err := clientset.CoreV1().Pods("default").Create(ctx, podBuilder.Pod, metav1.CreateOptions{})
if err != nil {
t.Fatalf("Creating orphan pod: %v", err)
}
err = manager.syncOrphanPod(ctx, cache.MetaObjectToName(orphanPod).String())
// Sync orphan pod by name or selector
err = manager.syncOrphanPod(ctx, orphanKey)
if err != nil {
t.Fatalf("Failed sync orphan pod: %v", err)
t.Fatalf("Failed to sync orphan pod: %v", err)
}
if tc.wantFinalizerRemoved {
if err := wait.PollUntilContextTimeout(ctx, 10*time.Millisecond, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) {
@ -6786,7 +6811,7 @@ func TestSyncOrphanPod(t *testing.T) {
time.Sleep(time.Millisecond)
orphanPod, err := clientset.CoreV1().Pods(orphanPod.Namespace).Get(ctx, orphanPod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to the latest pod: %v", err)
t.Fatalf("Failed to retrieve the latest pod: %v", err)
}
if !hasJobTrackingFinalizer(orphanPod) {
t.Errorf("Unexpected removal of the Job's finalizer")
@ -7796,6 +7821,11 @@ func (pb podBuilder) uid(u string) podBuilder {
return pb
}
func (pb podBuilder) labels(labels map[string]string) podBuilder {
pb.Labels = labels
return pb
}
func (pb podBuilder) job(j *batch.Job) podBuilder {
pb.Labels = j.Spec.Selector.MatchLabels
pb.Namespace = j.Namespace