mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-13 22:05:59 +00:00
dra: revamp event handlers in kube-controller-manager
Enabling logging is useful to track what the code is doing. There are some functional changes: - The pod handler checks for existence of claims. This avoids adding pods to the work queue in more cases when nothing needs to be done, at the cost of making the event handlers a bit slower. This will become more important when adding more work to the controller - The handler for deleted ResourceClaim did not check for cache.DeletedFinalStateUnknown.
This commit is contained in:
parent
08d40f53a7
commit
5cec6d798c
@ -152,13 +152,16 @@ func NewController(
|
|||||||
}
|
}
|
||||||
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
if _, err := claimInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: func(obj interface{}) {
|
AddFunc: func(obj interface{}) {
|
||||||
ec.onResourceClaimAddOrUpdate(logger, obj)
|
logger.V(6).Info("new claim", "claimDump", obj)
|
||||||
|
ec.enqueueResourceClaim(logger, obj, false)
|
||||||
},
|
},
|
||||||
UpdateFunc: func(old, updated interface{}) {
|
UpdateFunc: func(old, updated interface{}) {
|
||||||
ec.onResourceClaimAddOrUpdate(logger, updated)
|
logger.V(6).Info("updated claim", "claimDump", updated)
|
||||||
|
ec.enqueueResourceClaim(logger, updated, false)
|
||||||
},
|
},
|
||||||
DeleteFunc: func(obj interface{}) {
|
DeleteFunc: func(obj interface{}) {
|
||||||
ec.onResourceClaimDelete(logger, obj)
|
logger.V(6).Info("deleted claim", "claimDump", obj)
|
||||||
|
ec.enqueueResourceClaim(logger, obj, true)
|
||||||
},
|
},
|
||||||
}); err != nil {
|
}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -199,6 +202,7 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
|
|||||||
pod, ok := obj.(*v1.Pod)
|
pod, ok := obj.(*v1.Pod)
|
||||||
if !ok {
|
if !ok {
|
||||||
// Not a pod?!
|
// Not a pod?!
|
||||||
|
logger.Error(nil, "enqueuePod called for unexpected object", "type", fmt.Sprintf("%T", obj))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -208,13 +212,14 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
|
|||||||
}
|
}
|
||||||
|
|
||||||
if deleted {
|
if deleted {
|
||||||
|
logger.V(6).Info("pod got deleted", "pod", klog.KObj(pod))
|
||||||
ec.deletedObjects.Add(pod.UID)
|
ec.deletedObjects.Add(pod.UID)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
|
logger.V(6).Info("pod with resource claims changed", "pod", klog.KObj(pod), "deleted", deleted)
|
||||||
|
|
||||||
// Release reservations of a deleted or completed pod?
|
// Release reservations of a deleted or completed pod?
|
||||||
if deleted || isPodDone(pod) {
|
if needsClaims, reason := podNeedsClaims(pod, deleted); !needsClaims {
|
||||||
for _, podClaim := range pod.Spec.ResourceClaims {
|
for _, podClaim := range pod.Spec.ResourceClaims {
|
||||||
claimName, _, err := resourceclaim.Name(pod, &podClaim)
|
claimName, _, err := resourceclaim.Name(pod, &podClaim)
|
||||||
switch {
|
switch {
|
||||||
@ -222,68 +227,119 @@ func (ec *Controller) enqueuePod(logger klog.Logger, obj interface{}, deleted bo
|
|||||||
// Either the claim was not created (nothing to do here) or
|
// Either the claim was not created (nothing to do here) or
|
||||||
// the API changed. The later will also get reported elsewhere,
|
// the API changed. The later will also get reported elsewhere,
|
||||||
// so here it's just a debug message.
|
// so here it's just a debug message.
|
||||||
klog.TODO().V(6).Info("Nothing to do for claim during pod change", "err", err)
|
logger.V(6).Info("Nothing to do for claim during pod change", "err", err, "reason", reason)
|
||||||
case claimName != nil:
|
case claimName != nil:
|
||||||
key := claimKeyPrefix + pod.Namespace + "/" + *claimName
|
key := claimKeyPrefix + pod.Namespace + "/" + *claimName
|
||||||
logger.V(6).Info("pod is deleted or done, process claim", "pod", klog.KObj(pod), "key", key)
|
logger.V(6).Info("Process claim", "pod", klog.KObj(pod), "key", key, "reason", reason)
|
||||||
ec.queue.Add(claimKeyPrefix + pod.Namespace + "/" + *claimName)
|
ec.queue.Add(key)
|
||||||
default:
|
default:
|
||||||
// Nothing to do, claim wasn't generated.
|
// Nothing to do, claim wasn't generated.
|
||||||
klog.TODO().V(6).Info("Nothing to do for skipped claim during pod change")
|
logger.V(6).Info("Nothing to do for skipped claim during pod change", "reason", reason)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create ResourceClaim for inline templates?
|
needsWork, reason := ec.podNeedsWork(pod)
|
||||||
if pod.DeletionTimestamp == nil {
|
if needsWork {
|
||||||
for _, podClaim := range pod.Spec.ResourceClaims {
|
logger.V(6).Info("enqueing pod", "pod", klog.KObj(pod), "reason", reason)
|
||||||
|
ec.queue.Add(podKeyPrefix + pod.Namespace + "/" + pod.Name)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
logger.V(6).Info("not enqueing pod", "pod", klog.KObj(pod), "reason", reason)
|
||||||
|
}
|
||||||
|
|
||||||
|
func podNeedsClaims(pod *v1.Pod, deleted bool) (bool, string) {
|
||||||
|
if deleted {
|
||||||
|
return false, "pod got removed"
|
||||||
|
}
|
||||||
|
if podutil.IsPodTerminal(pod) {
|
||||||
|
return false, "pod has terminated"
|
||||||
|
}
|
||||||
|
if pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" {
|
||||||
|
return false, "pod got deleted before scheduling"
|
||||||
|
}
|
||||||
|
// Still needs claims.
|
||||||
|
return true, "pod might run"
|
||||||
|
}
|
||||||
|
|
||||||
|
// podNeedsWork checks whether a new or modified pod needs to be processed
|
||||||
|
// further by a worker. It returns a boolean with the result and an explanation
|
||||||
|
// for it.
|
||||||
|
func (ec *Controller) podNeedsWork(pod *v1.Pod) (bool, string) {
|
||||||
|
if pod.DeletionTimestamp != nil {
|
||||||
|
// Nothing else to do for the pod.
|
||||||
|
return false, "pod is deleted"
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, podClaim := range pod.Spec.ResourceClaims {
|
||||||
|
claimName, checkOwner, err := resourceclaim.Name(pod, &podClaim)
|
||||||
|
if err != nil {
|
||||||
|
return true, err.Error()
|
||||||
|
}
|
||||||
|
// If the claimName is nil, then it has been determined before
|
||||||
|
// that the claim is not needed.
|
||||||
|
if claimName == nil {
|
||||||
|
return false, "claim is not needed"
|
||||||
|
}
|
||||||
|
claim, err := ec.claimLister.ResourceClaims(pod.Namespace).Get(*claimName)
|
||||||
|
if apierrors.IsNotFound(err) {
|
||||||
if podClaim.Source.ResourceClaimTemplateName != nil {
|
if podClaim.Source.ResourceClaimTemplateName != nil {
|
||||||
// It has at least one inline template, work on it.
|
return true, "must create ResourceClaim from template"
|
||||||
key := podKeyPrefix + pod.Namespace + "/" + pod.Name
|
|
||||||
logger.V(6).Info("pod is not deleted, process it", "pod", klog.KObj(pod), "key", key)
|
|
||||||
ec.queue.Add(key)
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
|
// User needs to create claim.
|
||||||
|
return false, "claim is missing and must be created by user"
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
// Shouldn't happen.
|
||||||
|
return true, fmt.Sprintf("internal error while checking for claim: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if checkOwner &&
|
||||||
|
resourceclaim.IsForPod(pod, claim) != nil {
|
||||||
|
// Cannot proceed with the pod unless that other claim gets deleted.
|
||||||
|
return false, "conflicting claim needs to be removed by user"
|
||||||
|
}
|
||||||
|
|
||||||
|
if pod.Spec.NodeName == "" {
|
||||||
|
// Scheduler will handle PodSchedulingContext and reservations.
|
||||||
|
return false, "pod not scheduled"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return false, "nothing to do"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ec *Controller) onResourceClaimAddOrUpdate(logger klog.Logger, obj interface{}) {
|
func (ec *Controller) enqueueResourceClaim(logger klog.Logger, obj interface{}, deleted bool) {
|
||||||
|
if d, ok := obj.(cache.DeletedFinalStateUnknown); ok {
|
||||||
|
obj = d.Obj
|
||||||
|
}
|
||||||
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
|
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// When starting up, we have to check all claims to find those with
|
if !deleted {
|
||||||
// stale pods in ReservedFor. During an update, a pod might get added
|
// When starting up, we have to check all claims to find those with
|
||||||
// that already no longer exists.
|
// stale pods in ReservedFor. During an update, a pod might get added
|
||||||
key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
|
// that already no longer exists.
|
||||||
logger.V(6).Info("claim is new or updated, process it", "key", key)
|
key := claimKeyPrefix + claim.Namespace + "/" + claim.Name
|
||||||
ec.queue.Add(key)
|
logger.V(6).Info("enqueing new or updated claim", "claim", klog.KObj(claim), "key", key)
|
||||||
}
|
ec.queue.Add(key)
|
||||||
|
} else {
|
||||||
func (ec *Controller) onResourceClaimDelete(logger klog.Logger, obj interface{}) {
|
logger.V(6).Info("not enqueing deleted claim", "claim", klog.KObj(claim))
|
||||||
claim, ok := obj.(*resourcev1alpha2.ResourceClaim)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Someone deleted a ResourceClaim, either intentionally or
|
// Also check whether this causes work for any of the currently
|
||||||
// accidentally. If there is a pod referencing it because of
|
// known pods which use the ResourceClaim.
|
||||||
// an inline resource, then we should re-create the ResourceClaim.
|
|
||||||
// The common indexer does some prefiltering for us by
|
|
||||||
// limiting the list to those pods which reference
|
|
||||||
// the ResourceClaim.
|
|
||||||
objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
|
objs, err := ec.podIndexer.ByIndex(podResourceClaimIndex, fmt.Sprintf("%s/%s", claim.Namespace, claim.Name))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("listing pods from cache: %v", err))
|
logger.Error(err, "listing pods from cache")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(objs) == 0 {
|
if len(objs) == 0 {
|
||||||
logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim))
|
logger.V(6).Info("claim got deleted while not needed by any pod, nothing to do", "claim", klog.KObj(claim))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger = klog.LoggerWithValues(logger, "claim", klog.KObj(claim))
|
|
||||||
for _, obj := range objs {
|
for _, obj := range objs {
|
||||||
ec.enqueuePod(logger, obj, false)
|
ec.enqueuePod(logger, obj, false)
|
||||||
}
|
}
|
||||||
|
@ -343,7 +343,7 @@ func TestSyncHandler(t *testing.T) {
|
|||||||
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
|
claimInformer := informerFactory.Resource().V1alpha2().ResourceClaims()
|
||||||
templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
|
templateInformer := informerFactory.Resource().V1alpha2().ResourceClaimTemplates()
|
||||||
|
|
||||||
ec, err := NewController(klog.TODO(), fakeKubeClient, podInformer, claimInformer, templateInformer)
|
ec, err := NewController(klog.FromContext(ctx), fakeKubeClient, podInformer, claimInformer, templateInformer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error creating ephemeral controller : %v", err)
|
t.Fatalf("error creating ephemeral controller : %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user