From 79601acb2cc18440af8fc8d193911f6d291f9fb7 Mon Sep 17 00:00:00 2001 From: "Bobby (Babak) Salamat" Date: Tue, 23 Jan 2018 12:03:35 -0800 Subject: [PATCH] Add better event handling for deleted Pods --- pkg/scheduler/factory/factory.go | 82 ++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 24 deletions(-) diff --git a/pkg/scheduler/factory/factory.go b/pkg/scheduler/factory/factory.go index cfde827ca9b..c9426297b9c 100644 --- a/pkg/scheduler/factory/factory.go +++ b/pkg/scheduler/factory/factory.go @@ -189,6 +189,13 @@ func NewConfigFactory( switch t := obj.(type) { case *v1.Pod: return assignedNonTerminatedPod(t) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return assignedNonTerminatedPod(pod) + } else { + runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) + return false + } default: runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) return false @@ -208,36 +215,22 @@ func NewConfigFactory( switch t := obj.(type) { case *v1.Pod: return unassignedNonTerminatedPod(t) + case cache.DeletedFinalStateUnknown: + if pod, ok := t.Obj.(*v1.Pod); ok { + return unassignedNonTerminatedPod(pod) + } else { + runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) + return false + } default: runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) return false } }, Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: func(obj interface{}) { - if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil { - runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) - } - }, - UpdateFunc: func(oldObj, newObj interface{}) { - pod := newObj.(*v1.Pod) - if c.skipPodUpdate(pod) { - return - } - if err := c.podQueue.Update(pod); err != nil { - runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) - } - }, - DeleteFunc: func(obj interface{}) { - pod := obj.(*v1.Pod) - if err := c.podQueue.Delete(pod); err != nil { - runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) - } - if c.volumeBinder != nil { - // Volume binder only wants to keep unassigned pods - c.volumeBinder.DeletePodBindings(pod) - } - }, + AddFunc: c.addPodToSchedulingQueue, + UpdateFunc: c.updatePodInSchedulingQueue, + DeleteFunc: c.deletePodFromSchedulingQueue, }, }, ) @@ -591,6 +584,47 @@ func (c *configFactory) updatePodInCache(oldObj, newObj interface{}) { c.podQueue.AssignedPodUpdated(newPod) } +func (c *configFactory) addPodToSchedulingQueue(obj interface{}) { + if err := c.podQueue.Add(obj.(*v1.Pod)); err != nil { + runtime.HandleError(fmt.Errorf("unable to queue %T: %v", obj, err)) + } +} + +func (c *configFactory) updatePodInSchedulingQueue(oldObj, newObj interface{}) { + pod := newObj.(*v1.Pod) + if c.skipPodUpdate(pod) { + return + } + if err := c.podQueue.Update(pod); err != nil { + runtime.HandleError(fmt.Errorf("unable to update %T: %v", newObj, err)) + } +} + +func (c *configFactory) deletePodFromSchedulingQueue(obj interface{}) { + var pod *v1.Pod + switch t := obj.(type) { + case *v1.Pod: + pod = obj.(*v1.Pod) + case cache.DeletedFinalStateUnknown: + var ok bool + pod, ok = t.Obj.(*v1.Pod) + if !ok { + runtime.HandleError(fmt.Errorf("unable to convert object %T to *v1.Pod in %T", obj, c)) + return + } + default: + runtime.HandleError(fmt.Errorf("unable to handle object in %T: %T", c, obj)) + return + } + if err := c.podQueue.Delete(pod); err != nil { + runtime.HandleError(fmt.Errorf("unable to dequeue %T: %v", obj, err)) + } + if c.volumeBinder != nil { + // Volume binder only wants to keep unassigned pods + c.volumeBinder.DeletePodBindings(pod) + } +} + func (c *configFactory) invalidateCachedPredicatesOnUpdatePod(newPod *v1.Pod, oldPod *v1.Pod) { if c.enableEquivalenceClassCache { // if the pod does not have bound node, updating equivalence cache is meaningless;