Migrated pkg/scheduler/framework/plugins/interpodaffinity to use contextual logging

This commit is contained in:
Mengjiao Liu 2023-06-01 18:23:29 +08:00
parent 82f7cf6010
commit 6d23da045f
6 changed files with 24 additions and 13 deletions

View File

@ -23,6 +23,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -247,7 +248,8 @@ func (pl *InterPodAffinity) PreFilter(ctx context.Context, cycleState *framework
return nil, framework.AsStatus(err)
}
}
s.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
logger := klog.FromContext(ctx)
s.namespaceLabels = GetNamespaceLabelsSnapshot(logger, pod.Namespace, pl.nsLister)
s.existingAntiAffinityCounts = pl.getExistingAntiAffinityCounts(ctx, pod, s.namespaceLabels, nodesWithRequiredAntiAffinityPods)
s.affinityCounts, s.antiAffinityCounts = pl.getIncomingAffinityAntiAffinityCounts(ctx, s.podInfo, allNodes)

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
@ -537,7 +538,8 @@ func TestRequiredAffinitySingleNode(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
snapshot := cache.NewSnapshot(test.pods, []*v1.Node{test.node})
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, namespaces)
@ -948,7 +950,8 @@ func TestRequiredAffinityMultipleNodes(t *testing.T) {
for indexTest, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
snapshot := cache.NewSnapshot(test.pods, test.nodes)
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot,
@ -1223,7 +1226,8 @@ func TestPreFilterStateAddRemovePod(t *testing.T) {
// getMeta creates predicate meta data given the list of pods.
getState := func(pods []*v1.Pod) (*InterPodAffinity, *framework.CycleState, *preFilterState, *cache.Snapshot) {
snapshot := cache.NewSnapshot(pods, test.nodes)
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{}, snapshot, nil)
cycleState := framework.NewCycleState()

View File

@ -122,12 +122,12 @@ func (pl *InterPodAffinity) mergeAffinityTermNamespacesIfNotEmpty(at *framework.
// GetNamespaceLabelsSnapshot returns a snapshot of the labels associated with
// the namespace.
func GetNamespaceLabelsSnapshot(ns string, nsLister listersv1.NamespaceLister) (nsLabels labels.Set) {
func GetNamespaceLabelsSnapshot(logger klog.Logger, ns string, nsLister listersv1.NamespaceLister) (nsLabels labels.Set) {
podNS, err := nsLister.Get(ns)
if err == nil {
// Create and return snapshot of the labels.
return labels.Merge(podNS.Labels, nil)
}
klog.V(3).InfoS("getting namespace, assuming empty set of namespace labels", "namespace", ns, "err", err)
logger.V(3).Info("getting namespace, assuming empty set of namespace labels", "namespace", ns, "err", err)
return
}

View File

@ -24,6 +24,7 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
)
@ -184,7 +185,8 @@ func (pl *InterPodAffinity) PreScore(
return framework.AsStatus(fmt.Errorf("updating PreferredAntiAffinityTerms: %w", err))
}
}
state.namespaceLabels = GetNamespaceLabelsSnapshot(pod.Namespace, pl.nsLister)
logger := klog.FromContext(pCtx)
state.namespaceLabels = GetNamespaceLabelsSnapshot(logger, pod.Namespace, pl.nsLister)
topoScores := make([]scoreMap, len(allNodes))
index := int32(-1)

View File

@ -26,6 +26,7 @@ import (
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2/ktesting"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework"
plugintesting "k8s.io/kubernetes/pkg/scheduler/framework/plugins/testing"
@ -777,7 +778,8 @@ func TestPreferredAffinity(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
state := framework.NewCycleState()
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{HardPodAffinityWeight: 1, IgnorePreferredTermsOfExistingPods: test.ignorePreferredTermsOfExistingPods}, cache.NewSnapshot(test.pods, test.nodes), namespaces)
@ -944,7 +946,8 @@ func TestPreferredAffinityWithHardPodAffinitySymmetricWeight(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
state := framework.NewCycleState()
p := plugintesting.SetupPluginWithInformers(ctx, t, New, &config.InterPodAffinityArgs{HardPodAffinityWeight: test.hardPodAffinityWeight}, cache.NewSnapshot(test.pods, test.nodes), namespaces)

View File

@ -712,7 +712,7 @@ func (p *PriorityQueue) Delete(pod *v1.Pod) error {
// may make pending pods with matching affinity terms schedulable.
func (p *PriorityQueue) AssignedPodAdded(logger klog.Logger, pod *v1.Pod) {
p.lock.Lock()
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodAdd)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodAdd)
p.lock.Unlock()
}
@ -737,7 +737,7 @@ func (p *PriorityQueue) AssignedPodUpdated(logger klog.Logger, pod *v1.Pod) {
if isPodResourcesResizedDown(pod) {
p.moveAllToActiveOrBackoffQueue(logger, AssignedPodUpdate, nil)
} else {
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(pod), AssignedPodUpdate)
p.movePodsToActiveOrBackoffQueue(logger, p.getUnschedulablePodsWithMatchingAffinityTerm(logger, pod), AssignedPodUpdate)
}
p.lock.Unlock()
}
@ -806,8 +806,8 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(logger klog.Logger, podIn
// getUnschedulablePodsWithMatchingAffinityTerm returns unschedulable pods which have
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(logger klog.Logger, pod *v1.Pod) []*framework.QueuedPodInfo {
nsLabels := interpodaffinity.GetNamespaceLabelsSnapshot(logger, pod.Namespace, p.nsLister)
var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulablePods.podInfoMap {