diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go index 299a27d110b..ddf6ac58e52 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering.go @@ -152,7 +152,7 @@ func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod) boo // calculates the following for each existing pod on each node: // (1) Whether it has PodAntiAffinity // (2) Whether any AffinityTerm matches the incoming pod -func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount { +func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount { topoMaps := make([]topologyToMatchedTermCount, len(nodes)) index := int32(-1) processNode := func(i int) { @@ -170,7 +170,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels topoMaps[atomic.AddInt32(&index, 1)] = topoMap } } - pl.parallelizer.Until(context.Background(), len(nodes), processNode) + pl.parallelizer.Until(ctx, len(nodes), processNode) result := make(topologyToMatchedTermCount) for i := 0; i <= int(index); i++ { @@ -184,7 +184,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels // It returns a topologyToMatchedTermCount that are checked later by the affinity // predicate. With this topologyToMatchedTermCount available, the affinity predicate does not // need to check all the pods in the cluster. -func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) { +func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Context, podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) { affinityCounts := make(topologyToMatchedTermCount) antiAffinityCounts := make(topologyToMatchedTermCount) if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 { @@ -216,7 +216,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(podInfo *frame antiAffinityCountsList[k] = antiAffinity } } - pl.parallelizer.Until(context.Background(), len(allNodes), processNode) + pl.parallelizer.Until(ctx, len(allNodes), processNode) for i := 0; i <= int(index); i++ { affinityCounts.append(affinityCountsList[i]) @@ -257,8 +257,8 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework } s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister) - s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods) - s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes) + s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(ctx, pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods) + s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes) cycleState.Write(preFilterStateKey, s) return nil diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go index c334c777b03..22c3e23fed6 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/filtering_test.go @@ -2443,7 +2443,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil) - gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l) + gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(ctx, framework.NewPodInfo(tt.pod), l) if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) { t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap) } diff --git a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go index d3685b6e6a1..f333bb6e3ca 100644 --- a/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go +++ b/pkg/scheduler/framework/plugins/interpodaffinity/scoring.go @@ -203,7 +203,7 @@ func (pl *InterPodAffinity) PreScore( topoScores[atomic.AddInt32(&index, 1)] = topoScore } } - pl.parallelizer.Until(context.Background(), len(allNodes), processNode) + pl.parallelizer.Until(pCtx, len(allNodes), processNode) for i := 0; i <= int(index); i++ { state.topologyScore.append(topoScores[i]) diff --git a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go index 4bc94fcdd25..4d6d472fd45 100644 --- a/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go +++ b/pkg/scheduler/framework/plugins/volumerestrictions/volume_restrictions.go @@ -122,7 +122,7 @@ func haveOverlap(a1, a2 []string) bool { func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status { if pl.enableReadWriteOncePod { - return pl.isReadWriteOncePodAccessModeConflict(pod) + return pl.isReadWriteOncePodAccessModeConflict(ctx, pod) } return framework.NewStatus(framework.Success) } @@ -131,7 +131,7 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo // This access mode restricts volume access to a single pod on a single node. Since only a single pod can // use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable. // TODO(#103132): Mark pod as Unschedulable and add preemption logic. -func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status { +func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status { nodeInfos, err := pl.nodeInfoLister.NodeInfos().List() if err != nil { return framework.NewStatus(framework.Error, "error while getting node info") @@ -159,7 +159,7 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) pvcKeys = append(pvcKeys, key) } - ctx, cancel := context.WithCancel(context.Background()) + subCtx, cancel := context.WithCancel(ctx) var conflicts uint32 processNode := func(i int) { @@ -172,7 +172,7 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) } } } - pl.parallelizer.Until(ctx, len(nodeInfos), processNode) + pl.parallelizer.Until(subCtx, len(nodeInfos), processNode) // Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet. if conflicts > 0 {