diff --git a/pkg/controller/resourceclaim/controller.go b/pkg/controller/resourceclaim/controller.go index 74527ca8cdc..492c27f65ca 100644 --- a/pkg/controller/resourceclaim/controller.go +++ b/pkg/controller/resourceclaim/controller.go @@ -27,7 +27,6 @@ import ( resourcev1alpha2 "k8s.io/api/resource/v1alpha2" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" @@ -59,6 +58,10 @@ const ( // and not documented as part of the Kubernetes API. podResourceClaimAnnotation = "resource.kubernetes.io/pod-claim-name" + // claimPodOwnerIndex is used to find ResourceClaims which have + // a specific pod as owner. Values for this index are the pod UID. + claimPodOwnerIndex = "claim-pod-owner-index" + // Field manager used to update the pod status. fieldManager = "ResourceClaimController" @@ -76,6 +79,7 @@ type Controller struct { // therefore the ResourceClaim objects in its store should be treated as immutable. claimLister resourcev1alpha2listers.ResourceClaimLister claimsSynced cache.InformerSynced + claimCache cache.MutationCache // podLister is the shared Pod lister used to fetch Pod // objects from the API server. It is shared with other controllers and @@ -163,6 +167,28 @@ func NewController( return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) } + // The mutation cache acts as an additional layer for the informer + // cache and after a create made by the controller returns that + // object until the informer catches up. That is necessary + // when a ResourceClaim got created, updating the pod status fails, + // and then a retry occurs before the informer cache is updated. + // In that scenario, the controller would create another claim + // instead of continuing with the existing one. + claimInformerCache := claimInformer.Informer().GetIndexer() + if err := claimInformerCache.AddIndexers(cache.Indexers{claimPodOwnerIndex: claimPodOwnerIndexFunc}); err != nil { + return nil, fmt.Errorf("could not initialize ResourceClaim controller: %w", err) + } + ec.claimCache = cache.NewIntegerResourceVersionMutationCache(claimInformerCache, claimInformerCache, + // Very long time to live, unlikely to be needed because + // the informer cache should get updated soon. + time.Hour, + // Allow storing objects not in the underlying cache - that's the point... + // It's safe because in case of a race (claim is in mutation cache, claim + // gets deleted, controller updates status based on mutation cache) the + // "bad" pod status will get detected and fixed when the informer catches up. + true, + ) + return ec, nil } @@ -487,6 +513,7 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1. metrics.ResourceClaimCreateFailures.Inc() return fmt.Errorf("create ResourceClaim %s: %v", claimName, err) } + ec.claimCache.Mutation(claim) } // Remember the new ResourceClaim for a batch PodStatus update in our caller. @@ -502,14 +529,16 @@ func (ec *Controller) handleClaim(ctx context.Context, pod *v1.Pod, podClaim v1. // annotation (ties it to the pod claim) and the right ownership (ties it to // the pod). func (ec *Controller) findPodResourceClaim(pod *v1.Pod, podClaim v1.PodResourceClaim) (*resourcev1alpha2.ResourceClaim, error) { - claims, err := ec.claimLister.List(labels.Everything()) + // Only claims owned by the pod will get returned here. + claims, err := ec.claimCache.ByIndex(claimPodOwnerIndex, string(pod.UID)) if err != nil { return nil, err } deterministicName := pod.Name + "-" + podClaim.Name // Kubernetes <= 1.27 behavior. - for _, claim := range claims { - if err := resourceclaim.IsForPod(pod, claim); err != nil { - continue + for _, claimObj := range claims { + claim, ok := claimObj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return nil, fmt.Errorf("unexpected object of type %T returned by claim cache", claimObj) } podClaimName, ok := claim.Annotations[podResourceClaimAnnotation] if ok && podClaimName != podClaim.Name { @@ -715,3 +744,22 @@ func isPodDone(pod *v1.Pod) bool { // Deleted and not scheduled: pod.DeletionTimestamp != nil && pod.Spec.NodeName == "" } + +// claimPodOwnerIndexFunc is an index function that returns the pod UIDs of +// all pods which own the resource claim. Should only be one, though. +func claimPodOwnerIndexFunc(obj interface{}) ([]string, error) { + claim, ok := obj.(*resourcev1alpha2.ResourceClaim) + if !ok { + return nil, nil + } + var keys []string + for _, owner := range claim.OwnerReferences { + if owner.Controller != nil && + *owner.Controller && + owner.APIVersion == "v1" && + owner.Kind == "Pod" { + keys = append(keys, string(owner.UID)) + } + } + return keys, nil +} diff --git a/pkg/controller/resourceclaim/controller_test.go b/pkg/controller/resourceclaim/controller_test.go index 8b8dcf7e044..6384887d9a2 100644 --- a/pkg/controller/resourceclaim/controller_test.go +++ b/pkg/controller/resourceclaim/controller_test.go @@ -92,6 +92,7 @@ func TestSyncHandler(t *testing.T) { name string key string claims []*resourcev1alpha2.ResourceClaim + claimsInCache []*resourcev1alpha2.ResourceClaim pods []*v1.Pod podsLater []*v1.Pod templates []*resourcev1alpha2.ResourceClaimTemplate @@ -185,6 +186,18 @@ func TestSyncHandler(t *testing.T) { }, expectedMetrics: expectedMetrics{0, 0}, }, + { + name: "find-created-claim-in-cache", + pods: []*v1.Pod{testPodWithResource}, + key: podKey(testPodWithResource), + claimsInCache: []*resourcev1alpha2.ResourceClaim{generatedTestClaim}, + expectedStatuses: map[string][]v1.PodResourceClaimStatus{ + testPodWithResource.Name: { + {Name: testPodWithResource.Spec.ResourceClaims[0].Name, ResourceClaimName: &generatedTestClaim.Name}, + }, + }, + expectedMetrics: expectedMetrics{0, 0}, + }, { name: "no-such-pod", key: podKey(testPodWithResource), @@ -345,6 +358,11 @@ func TestSyncHandler(t *testing.T) { informerFactory.WaitForCacheSync(ctx.Done()) cache.WaitForCacheSync(ctx.Done(), podInformer.Informer().HasSynced, claimInformer.Informer().HasSynced, templateInformer.Informer().HasSynced) + // Add claims that only exist in the mutation cache. + for _, claim := range tc.claimsInCache { + ec.claimCache.Mutation(claim) + } + // Simulate race: stop informers, add more pods that the controller doesn't know about. stopInformers() for _, pod := range tc.podsLater {