Implements pod affinity NamespaceSelector feature

This commit is contained in:
Abdullah Gharaibeh
2021-03-04 07:30:24 -05:00
parent 60a4998395
commit c7fef196b6
21 changed files with 1051 additions and 710 deletions

View File

@@ -29,15 +29,19 @@ import (
"sync"
"time"
"k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
ktypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/informers"
listersv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/internal/heap"
"k8s.io/kubernetes/pkg/scheduler/metrics"
"k8s.io/kubernetes/pkg/scheduler/util"
@@ -95,8 +99,11 @@ type SchedulingQueue interface {
}
// NewSchedulingQueue initializes a priority queue as a new scheduling queue.
func NewSchedulingQueue(lessFn framework.LessFunc, opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, opts...)
func NewSchedulingQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option) SchedulingQueue {
return NewPriorityQueue(lessFn, informerFactory, opts...)
}
// NominatedNodeName returns nominated node name of a Pod.
@@ -148,6 +155,8 @@ type PriorityQueue struct {
// closed indicates that the queue is closed.
// It is mainly used to let Pop() exit its control loop while waiting for an item.
closed bool
nsLister listersv1.NamespaceLister
}
type priorityQueueOptions struct {
@@ -218,6 +227,7 @@ func newQueuedPodInfoForLookup(pod *v1.Pod, plugins ...string) *framework.Queued
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
@@ -248,6 +258,9 @@ func NewPriorityQueue(
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
if utilfeature.DefaultFeatureGate.Enabled(features.PodAffinityNamespaceSelector) {
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
}
return pq
}
@@ -559,17 +572,15 @@ func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.
// any affinity term that matches "pod".
// NOTE: this function assumes lock has been acquired in caller.
func (p *PriorityQueue) getUnschedulablePodsWithMatchingAffinityTerm(pod *v1.Pod) []*framework.QueuedPodInfo {
nsSelectorEnabled := p.nsLister != nil
var nsLabels labels.Set
if nsSelectorEnabled {
nsLabels = interpodaffinity.GetNamespaceLabelsSnapshot(pod.Namespace, p.nsLister)
}
var podsToMove []*framework.QueuedPodInfo
for _, pInfo := range p.unschedulableQ.podInfoMap {
up := pInfo.Pod
terms := util.GetPodAffinityTerms(up.Spec.Affinity)
for _, term := range terms {
namespaces := util.GetNamespacesFromPodAffinityTerm(up, &term)
selector, err := metav1.LabelSelectorAsSelector(term.LabelSelector)
if err != nil {
klog.ErrorS(err, "Error getting label selectors for pod", "pod", klog.KObj(up))
}
if util.PodMatchesTermsNamespaceAndSelector(pod, namespaces, selector) {
for _, term := range pInfo.RequiredAffinityTerms {
if term.Matches(pod, nsLabels, nsSelectorEnabled) {
podsToMove = append(podsToMove, pInfo)
break
}