mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 03:11:40 +00:00
Merge pull request #108484 from Abirdcfly/fixctx
fix: some function should pass context parameter
This commit is contained in:
commit
d0d9a69414
@ -152,7 +152,7 @@ func podMatchesAllAffinityTerms(terms []framework.AffinityTerm, pod *v1.Pod) boo
|
||||
// calculates the following for each existing pod on each node:
|
||||
// (1) Whether it has PodAntiAffinity
|
||||
// (2) Whether any AffinityTerm matches the incoming pod
|
||||
func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
|
||||
func (pl *InterPodAffinity) getExistingAntiAffinityCounts(ctx context.Context, pod *v1.Pod, nsLabels labels.Set, nodes []*framework.NodeInfo) topologyToMatchedTermCount {
|
||||
topoMaps := make([]topologyToMatchedTermCount, len(nodes))
|
||||
index := int32(-1)
|
||||
processNode := func(i int) {
|
||||
@ -170,7 +170,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels
|
||||
topoMaps[atomic.AddInt32(&index, 1)] = topoMap
|
||||
}
|
||||
}
|
||||
pl.parallelizer.Until(context.Background(), len(nodes), processNode)
|
||||
pl.parallelizer.Until(ctx, len(nodes), processNode)
|
||||
|
||||
result := make(topologyToMatchedTermCount)
|
||||
for i := 0; i <= int(index); i++ {
|
||||
@ -184,7 +184,7 @@ func (pl *InterPodAffinity) getExistingAntiAffinityCounts(pod *v1.Pod, nsLabels
|
||||
// It returns a topologyToMatchedTermCount that are checked later by the affinity
|
||||
// predicate. With this topologyToMatchedTermCount available, the affinity predicate does not
|
||||
// need to check all the pods in the cluster.
|
||||
func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
|
||||
func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(ctx context.Context, podInfo *framework.PodInfo, allNodes []*framework.NodeInfo) (topologyToMatchedTermCount, topologyToMatchedTermCount) {
|
||||
affinityCounts := make(topologyToMatchedTermCount)
|
||||
antiAffinityCounts := make(topologyToMatchedTermCount)
|
||||
if len(podInfo.RequiredAffinityTerms) == 0 && len(podInfo.RequiredAntiAffinityTerms) == 0 {
|
||||
@ -216,7 +216,7 @@ func (pl *InterPodAffinity) getIncomingAffinityAntiAffinityCounts(podInfo *frame
|
||||
antiAffinityCountsList[k] = antiAffinity
|
||||
}
|
||||
}
|
||||
pl.parallelizer.Until(context.Background(), len(allNodes), processNode)
|
||||
pl.parallelizer.Until(ctx, len(allNodes), processNode)
|
||||
|
||||
for i := 0; i <= int(index); i++ {
|
||||
affinityCounts.append(affinityCountsList[i])
|
||||
@ -257,8 +257,8 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
|
||||
}
|
||||
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
|
||||
|
||||
s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods)
|
||||
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(s.podInfo, allNodes)
|
||||
s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(ctx, pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods)
|
||||
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes)
|
||||
|
||||
cycleState.Write(preFilterStateKey, s)
|
||||
return nil
|
||||
|
@ -2443,7 +2443,7 @@ func TestGetTPMapMatchingIncomingAffinityAntiAffinity(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil)
|
||||
gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(framework.NewPodInfo(tt.pod), l)
|
||||
gotAffinityPodsMap, gotAntiAffinityPodsMap := p.(*InterPodAffinity).getIncomingAffinityAntiAffinityCounts(ctx, framework.NewPodInfo(tt.pod), l)
|
||||
if !reflect.DeepEqual(gotAffinityPodsMap, tt.wantAffinityPodsMap) {
|
||||
t.Errorf("getTPMapMatchingIncomingAffinityAntiAffinity() gotAffinityPodsMap = %#v, want %#v", gotAffinityPodsMap, tt.wantAffinityPodsMap)
|
||||
}
|
||||
|
@ -203,7 +203,7 @@ func (pl *InterPodAffinity) PreScore(
|
||||
topoScores[atomic.AddInt32(&index, 1)] = topoScore
|
||||
}
|
||||
}
|
||||
pl.parallelizer.Until(context.Background(), len(allNodes), processNode)
|
||||
pl.parallelizer.Until(pCtx, len(allNodes), processNode)
|
||||
|
||||
for i := 0; i <= int(index); i++ {
|
||||
state.topologyScore.append(topoScores[i])
|
||||
|
@ -122,7 +122,7 @@ func haveOverlap(a1, a2 []string) bool {
|
||||
|
||||
func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod) *framework.Status {
|
||||
if pl.enableReadWriteOncePod {
|
||||
return pl.isReadWriteOncePodAccessModeConflict(pod)
|
||||
return pl.isReadWriteOncePodAccessModeConflict(ctx, pod)
|
||||
}
|
||||
return framework.NewStatus(framework.Success)
|
||||
}
|
||||
@ -131,7 +131,7 @@ func (pl *VolumeRestrictions) PreFilter(ctx context.Context, cycleState *framewo
|
||||
// This access mode restricts volume access to a single pod on a single node. Since only a single pod can
|
||||
// use a ReadWriteOncePod PVC, mark any other pods attempting to use this PVC as UnschedulableAndUnresolvable.
|
||||
// TODO(#103132): Mark pod as Unschedulable and add preemption logic.
|
||||
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod) *framework.Status {
|
||||
func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(ctx context.Context, pod *v1.Pod) *framework.Status {
|
||||
nodeInfos, err := pl.nodeInfoLister.NodeInfos().List()
|
||||
if err != nil {
|
||||
return framework.NewStatus(framework.Error, "error while getting node info")
|
||||
@ -159,7 +159,7 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod)
|
||||
pvcKeys = append(pvcKeys, key)
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
subCtx, cancel := context.WithCancel(ctx)
|
||||
var conflicts uint32
|
||||
|
||||
processNode := func(i int) {
|
||||
@ -172,7 +172,7 @@ func (pl *VolumeRestrictions) isReadWriteOncePodAccessModeConflict(pod *v1.Pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
pl.parallelizer.Until(ctx, len(nodeInfos), processNode)
|
||||
pl.parallelizer.Until(subCtx, len(nodeInfos), processNode)
|
||||
|
||||
// Enforce ReadWriteOncePod access mode. This is also enforced during volume mount in kubelet.
|
||||
if conflicts > 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user