mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-22 19:31:44 +00:00
Deprecate PredicateMetadata
This commit is contained in:
parent
ec4f3e3064
commit
770e51cd10
@ -457,8 +457,7 @@ func TestCSIVolumeCountPredicate(t *testing.T) {
|
||||
getFakeCSIPVCLister(test.filterName, "csi-sc", test.driverNames...),
|
||||
getFakeCSIStorageClassLister("csi-sc", test.driverNames[0]))
|
||||
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := pred(test.newPod, factory.GetPredicateMetadata(test.newPod, nil), node)
|
||||
fits, reasons, err := pred(test.newPod, nil, node)
|
||||
if err != nil {
|
||||
t.Errorf("Using allocatable [%s]%s: unexpected error: %v", test.filterName, test.test, err)
|
||||
}
|
||||
|
@ -852,8 +852,7 @@ func TestVolumeCountConflicts(t *testing.T) {
|
||||
getFakeStorageClassLister(test.filterName),
|
||||
getFakePVLister(test.filterName),
|
||||
getFakePVCLister(test.filterName))
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := pred(test.newPod, factory.GetPredicateMetadata(test.newPod, nil), node)
|
||||
fits, reasons, err := pred(test.newPod, nil, node)
|
||||
if err != nil {
|
||||
t.Errorf("Using allocatable [%s]%s: unexpected error: %v", test.filterName, test.test, err)
|
||||
}
|
||||
|
@ -28,20 +28,13 @@ import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog"
|
||||
priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util"
|
||||
schedulerlisters "k8s.io/kubernetes/pkg/scheduler/listers"
|
||||
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
)
|
||||
|
||||
// Metadata interface represents anything that can access a predicate metadata.
|
||||
type Metadata interface {
|
||||
ShallowCopy() Metadata
|
||||
AddPod(addedPod *v1.Pod, node *v1.Node) error
|
||||
RemovePod(deletedPod *v1.Pod, node *v1.Node) error
|
||||
}
|
||||
|
||||
// MetadataProducer is a function that computes predicate metadata for a given pod.
|
||||
type MetadataProducer func(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) Metadata
|
||||
// DEPRECATED.
|
||||
type Metadata interface{}
|
||||
|
||||
// AntiAffinityTerm's topology key value used in predicate metadata
|
||||
type topologyPair struct {
|
||||
@ -229,49 +222,6 @@ func (m *PodAffinityMetadata) Clone() *PodAffinityMetadata {
|
||||
return ©
|
||||
}
|
||||
|
||||
// NOTE: When new fields are added/removed or logic is changed, please make sure that
|
||||
// RemovePod, AddPod, and ShallowCopy functions are updated to work with the new changes.
|
||||
// TODO(ahg-g): remove, not use anymore.
|
||||
type predicateMetadata struct {
|
||||
}
|
||||
|
||||
// Ensure that predicateMetadata implements algorithm.Metadata.
|
||||
var _ Metadata = &predicateMetadata{}
|
||||
|
||||
// predicateMetadataProducer function produces predicate metadata. It is stored in a global variable below
|
||||
// and used to modify the return values of MetadataProducer
|
||||
type predicateMetadataProducer func(pm *predicateMetadata)
|
||||
|
||||
var predicateMetadataProducers = make(map[string]predicateMetadataProducer)
|
||||
|
||||
// RegisterPredicateMetadataProducer registers a MetadataProducer.
|
||||
func RegisterPredicateMetadataProducer(predicateName string, precomp predicateMetadataProducer) {
|
||||
predicateMetadataProducers[predicateName] = precomp
|
||||
}
|
||||
|
||||
// EmptyMetadataProducer returns a no-op MetadataProducer type.
|
||||
func EmptyMetadataProducer(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) Metadata {
|
||||
return nil
|
||||
}
|
||||
|
||||
// MetadataProducerFactory is a factory to produce Metadata.
|
||||
type MetadataProducerFactory struct{}
|
||||
|
||||
// GetPredicateMetadata returns the predicateMetadata which will be used by various predicates.
|
||||
func (f *MetadataProducerFactory) GetPredicateMetadata(pod *v1.Pod, sharedLister schedulerlisters.SharedLister) Metadata {
|
||||
// If we cannot compute metadata, just return nil
|
||||
if pod == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
predicateMetadata := &predicateMetadata{}
|
||||
for predicateName, precomputeFunc := range predicateMetadataProducers {
|
||||
klog.V(10).Infof("Precompute: %v", predicateName)
|
||||
precomputeFunc(predicateMetadata)
|
||||
}
|
||||
return predicateMetadata
|
||||
}
|
||||
|
||||
// GetPodAffinityMetadata computes inter-pod affinity metadata.
|
||||
func GetPodAffinityMetadata(pod *v1.Pod, allNodes []*schedulernodeinfo.NodeInfo, havePodsWithAffinityNodes []*schedulernodeinfo.NodeInfo) (*PodAffinityMetadata, error) {
|
||||
// existingPodAntiAffinityMap will be used later for efficient check on existing pods' anti-affinity
|
||||
@ -460,25 +410,6 @@ func (m *PodTopologySpreadMetadata) Clone() *PodTopologySpreadMetadata {
|
||||
return &cp
|
||||
}
|
||||
|
||||
// RemovePod changes predicateMetadata assuming that the given `deletedPod` is
|
||||
// deleted from the system.
|
||||
func (meta *predicateMetadata) RemovePod(deletedPod *v1.Pod, node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AddPod changes predicateMetadata assuming that the given `addedPod` is added to the
|
||||
// system.
|
||||
func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, node *v1.Node) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// ShallowCopy copies a metadata struct into a new struct and creates a copy of
|
||||
// its maps and slices, but it does not copy the contents of pointer values.
|
||||
func (meta *predicateMetadata) ShallowCopy() Metadata {
|
||||
newPredMeta := &predicateMetadata{}
|
||||
return (Metadata)(newPredMeta)
|
||||
}
|
||||
|
||||
// A processed version of v1.PodAffinityTerm.
|
||||
type affinityTermProperties struct {
|
||||
namespaces sets.String
|
||||
|
@ -567,8 +567,7 @@ func TestPodFitsHost(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
nodeInfo := schedulernodeinfo.NewNodeInfo()
|
||||
nodeInfo.SetNode(test.node)
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := PodFitsHost(test.pod, factory.GetPredicateMetadata(test.pod, nil), nodeInfo)
|
||||
fits, reasons, err := PodFitsHost(test.pod, nil, nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -708,8 +707,7 @@ func TestPodFitsHostPorts(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := PodFitsHostPorts(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
fits, reasons, err := PodFitsHostPorts(test.pod, nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -761,8 +759,7 @@ func TestGCEDiskConflicts(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &MetadataProducerFactory{}
|
||||
ok, reasons, err := NoDiskConflict(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -817,8 +814,7 @@ func TestAWSDiskConflicts(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &MetadataProducerFactory{}
|
||||
ok, reasons, err := NoDiskConflict(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -879,8 +875,7 @@ func TestRBDDiskConflicts(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &MetadataProducerFactory{}
|
||||
ok, reasons, err := NoDiskConflict(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -941,8 +936,7 @@ func TestISCSIDiskConflicts(t *testing.T) {
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
factory := &MetadataProducerFactory{}
|
||||
ok, reasons, err := NoDiskConflict(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
ok, reasons, err := NoDiskConflict(test.pod, nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1643,8 +1637,7 @@ func TestPodFitsSelector(t *testing.T) {
|
||||
nodeInfo := schedulernodeinfo.NewNodeInfo()
|
||||
nodeInfo.SetNode(&node)
|
||||
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := PodMatchNodeSelector(test.pod, factory.GetPredicateMetadata(test.pod, nil), nodeInfo)
|
||||
fits, reasons, err := PodMatchNodeSelector(test.pod, nil, nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1707,8 +1700,7 @@ func TestNodeLabelPresence(t *testing.T) {
|
||||
nodeInfo.SetNode(&node)
|
||||
|
||||
labelChecker := NodeLabelChecker{test.presentLabels, test.absentLabels}
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := labelChecker.CheckNodeLabelPresence(test.pod, factory.GetPredicateMetadata(test.pod, nil), nodeInfo)
|
||||
fits, reasons, err := labelChecker.CheckNodeLabelPresence(test.pod, nil, nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1808,8 +1800,7 @@ func TestRunGeneralPredicates(t *testing.T) {
|
||||
for _, test := range resourceTests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
test.nodeInfo.SetNode(test.node)
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := GeneralPredicates(test.pod, factory.GetPredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
fits, reasons, err := GeneralPredicates(test.pod, nil, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -2012,8 +2003,7 @@ func TestPodToleratesTaints(t *testing.T) {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
nodeInfo := schedulernodeinfo.NewNodeInfo()
|
||||
nodeInfo.SetNode(&test.node)
|
||||
factory := &MetadataProducerFactory{}
|
||||
fits, reasons, err := PodToleratesNodeTaints(test.pod, factory.GetPredicateMetadata(test.pod, nil), nodeInfo)
|
||||
fits, reasons, err := PodToleratesNodeTaints(test.pod, nil, nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -51,9 +51,6 @@ type AlgorithmFactoryArgs struct {
|
||||
// PriorityMetadataProducerFactory produces MetadataProducer from the given args.
|
||||
type PriorityMetadataProducerFactory func(AlgorithmFactoryArgs) priorities.MetadataProducer
|
||||
|
||||
// PredicateMetadataProducerFactory produces MetadataProducer from the given args.
|
||||
type PredicateMetadataProducerFactory func(AlgorithmFactoryArgs) predicates.MetadataProducer
|
||||
|
||||
// FitPredicateFactory produces a FitPredicate from the given args.
|
||||
type FitPredicateFactory func(AlgorithmFactoryArgs) predicates.FitPredicate
|
||||
|
||||
@ -78,8 +75,7 @@ var (
|
||||
algorithmProviderMap = make(map[string]AlgorithmProviderConfig)
|
||||
|
||||
// Registered metadata producers
|
||||
priorityMetadataProducerFactory PriorityMetadataProducerFactory
|
||||
predicateMetadataProducerFactory PredicateMetadataProducerFactory
|
||||
priorityMetadataProducerFactory PriorityMetadataProducerFactory
|
||||
)
|
||||
|
||||
// AlgorithmProviderConfig is used to store the configuration of algorithm providers.
|
||||
@ -316,13 +312,6 @@ func RegisterPriorityMetadataProducerFactory(f PriorityMetadataProducerFactory)
|
||||
priorityMetadataProducerFactory = f
|
||||
}
|
||||
|
||||
// RegisterPredicateMetadataProducerFactory registers a MetadataProducer.
|
||||
func RegisterPredicateMetadataProducerFactory(f PredicateMetadataProducerFactory) {
|
||||
schedulerFactoryMutex.Lock()
|
||||
defer schedulerFactoryMutex.Unlock()
|
||||
predicateMetadataProducerFactory = f
|
||||
}
|
||||
|
||||
// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
|
||||
// with which the function was registered.
|
||||
func RegisterPriorityMapReduceFunction(
|
||||
@ -545,16 +534,6 @@ func getPriorityMetadataProducer(args AlgorithmFactoryArgs) (priorities.Metadata
|
||||
return priorityMetadataProducerFactory(args), nil
|
||||
}
|
||||
|
||||
func getPredicateMetadataProducer(args AlgorithmFactoryArgs) (predicates.MetadataProducer, error) {
|
||||
schedulerFactoryMutex.Lock()
|
||||
defer schedulerFactoryMutex.Unlock()
|
||||
|
||||
if predicateMetadataProducerFactory == nil {
|
||||
return predicates.EmptyMetadataProducer, nil
|
||||
}
|
||||
return predicateMetadataProducerFactory(args), nil
|
||||
}
|
||||
|
||||
func getPriorityFunctionConfigs(names sets.String, args AlgorithmFactoryArgs) ([]priorities.PriorityConfig, error) {
|
||||
schedulerFactoryMutex.RLock()
|
||||
defer schedulerFactoryMutex.RUnlock()
|
||||
|
@ -22,13 +22,6 @@ import (
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Register functions that extract metadata used by predicates computations.
|
||||
scheduler.RegisterPredicateMetadataProducerFactory(
|
||||
func(args scheduler.AlgorithmFactoryArgs) predicates.MetadataProducer {
|
||||
f := &predicates.MetadataProducerFactory{}
|
||||
return f.GetPredicateMetadata
|
||||
})
|
||||
|
||||
// IMPORTANT NOTES for predicate developers:
|
||||
// Registers predicates and priorities that are not enabled by default, but user can pick when creating their
|
||||
// own set of priorities/predicates.
|
||||
|
@ -34,7 +34,6 @@ import (
|
||||
clientsetfake "k8s.io/client-go/kubernetes/fake"
|
||||
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/apis/config"
|
||||
extenderv1 "k8s.io/kubernetes/pkg/scheduler/apis/extender/v1"
|
||||
@ -586,7 +585,6 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
cache,
|
||||
queue,
|
||||
nil,
|
||||
predicates.EmptyMetadataProducer,
|
||||
priorities.EmptyMetadataProducer,
|
||||
emptySnapshot,
|
||||
fwk,
|
||||
|
@ -129,10 +129,6 @@ type ScheduleAlgorithm interface {
|
||||
// Prioritizers returns a slice of priority config. This is exposed for
|
||||
// testing.
|
||||
Extenders() []algorithm.SchedulerExtender
|
||||
// GetPredicateMetadataProducer returns the predicate metadata producer. This is needed
|
||||
// for cluster autoscaler integration.
|
||||
// TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
|
||||
PredicateMetadataProducer() predicates.MetadataProducer
|
||||
// Snapshot snapshots scheduler cache and node infos. This is needed
|
||||
// for cluster autoscaler integration.
|
||||
// TODO(#85691): remove this once CA migrates to creating a Framework instead of a full scheduler.
|
||||
@ -158,7 +154,6 @@ type genericScheduler struct {
|
||||
schedulingQueue internalqueue.SchedulingQueue
|
||||
predicates map[string]predicates.FitPredicate
|
||||
priorityMetaProducer priorities.MetadataProducer
|
||||
predicateMetaProducer predicates.MetadataProducer
|
||||
prioritizers []priorities.PriorityConfig
|
||||
framework framework.Framework
|
||||
extenders []algorithm.SchedulerExtender
|
||||
@ -186,12 +181,6 @@ func (g *genericScheduler) Framework() framework.Framework {
|
||||
return g.framework
|
||||
}
|
||||
|
||||
// PredicateMetadataProducer returns the predicate metadata producer. This is needed
|
||||
// for cluster autoscaler integration.
|
||||
func (g *genericScheduler) PredicateMetadataProducer() predicates.MetadataProducer {
|
||||
return g.predicateMetaProducer
|
||||
}
|
||||
|
||||
// Schedule tries to schedule the given pod to one of the nodes in the node list.
|
||||
// If it succeeds, it will return the name of the node.
|
||||
// If it fails, it will return a FitError error with reasons.
|
||||
@ -436,7 +425,7 @@ func (g *genericScheduler) processPreemptionWithExtenders(
|
||||
// priority of the given "pod" and are nominated to run on the given node.
|
||||
// Note: We could possibly check if the nominated lower priority pods still fit
|
||||
// and return those that no longer fit, but that would require lots of
|
||||
// manipulation of NodeInfo and PredicateMeta per nominated pod. It may not be
|
||||
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
|
||||
// worth the complexity, especially because we generally expect to have a very
|
||||
// small number of nominated pods per node.
|
||||
func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod {
|
||||
@ -504,23 +493,11 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
||||
)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
|
||||
// We can use the same metadata producer for all nodes.
|
||||
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot)
|
||||
state.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: meta})
|
||||
|
||||
checkNode := func(i int) {
|
||||
// We check the nodes starting from where we left off in the previous scheduling cycle,
|
||||
// this is to make sure all nodes have the same chance of being examined across pods.
|
||||
nodeInfo := g.nodeInfoSnapshot.NodeInfoList[(g.nextStartNodeIndex+i)%allNodes]
|
||||
fits, _, status, err := g.podFitsOnNode(
|
||||
ctx,
|
||||
state,
|
||||
pod,
|
||||
meta,
|
||||
nodeInfo,
|
||||
g.alwaysCheckAllPredicates,
|
||||
)
|
||||
fits, _, status, err := g.podFitsOnNode(ctx, state, pod, nodeInfo)
|
||||
if err != nil {
|
||||
errCh.SendErrorWithCancel(err, cancel)
|
||||
return
|
||||
@ -587,44 +564,32 @@ func (g *genericScheduler) findNodesThatFit(ctx context.Context, state *framewor
|
||||
}
|
||||
|
||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||
// to run on the node given in nodeInfo to meta and nodeInfo. It returns 1) whether
|
||||
// any pod was added, 2) augmented metadata, 3) augmented CycleState 4) augmented nodeInfo.
|
||||
// TODO(Huang-Wei): remove 'meta predicates.Metadata' from the signature.
|
||||
func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, meta predicates.Metadata, state *framework.CycleState,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo) (bool, predicates.Metadata,
|
||||
*framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
|
||||
// to run on the node. It returns 1) whether any pod was added, 2) augmented cycleState,
|
||||
// 3) augmented nodeInfo.
|
||||
func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, state *framework.CycleState,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo) (bool, *framework.CycleState, *schedulernodeinfo.NodeInfo, error) {
|
||||
if g.schedulingQueue == nil || nodeInfo == nil || nodeInfo.Node() == nil {
|
||||
// This may happen only in tests.
|
||||
return false, meta, state, nodeInfo, nil
|
||||
return false, state, nodeInfo, nil
|
||||
}
|
||||
nominatedPods := g.schedulingQueue.NominatedPodsForNode(nodeInfo.Node().Name)
|
||||
if len(nominatedPods) == 0 {
|
||||
return false, meta, state, nodeInfo, nil
|
||||
return false, state, nodeInfo, nil
|
||||
}
|
||||
nodeInfoOut := nodeInfo.Clone()
|
||||
var metaOut predicates.Metadata
|
||||
if meta != nil {
|
||||
metaOut = meta.ShallowCopy()
|
||||
}
|
||||
stateOut := state.Clone()
|
||||
stateOut.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaOut})
|
||||
podsAdded := false
|
||||
for _, p := range nominatedPods {
|
||||
if podutil.GetPodPriority(p) >= podutil.GetPodPriority(pod) && p.UID != pod.UID {
|
||||
nodeInfoOut.AddPod(p)
|
||||
if metaOut != nil {
|
||||
if err := metaOut.AddPod(p, nodeInfoOut.Node()); err != nil {
|
||||
return false, meta, state, nodeInfo, err
|
||||
}
|
||||
}
|
||||
status := g.framework.RunPreFilterExtensionAddPod(ctx, stateOut, pod, p, nodeInfoOut)
|
||||
if !status.IsSuccess() {
|
||||
return false, meta, state, nodeInfo, status.AsError()
|
||||
return false, state, nodeInfo, status.AsError()
|
||||
}
|
||||
podsAdded = true
|
||||
}
|
||||
}
|
||||
return podsAdded, metaOut, stateOut, nodeInfoOut, nil
|
||||
return podsAdded, stateOut, nodeInfoOut, nil
|
||||
}
|
||||
|
||||
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
|
||||
@ -636,21 +601,19 @@ func (g *genericScheduler) addNominatedPods(ctx context.Context, pod *v1.Pod, me
|
||||
// pods nominated to run on the node.
|
||||
// When it is called from Preempt, we should remove the victims of preemption and
|
||||
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
|
||||
// It removes victims from meta and NodeInfo before calling this function.
|
||||
// It removes victims from PreFilter state and NodeInfo before calling this function.
|
||||
func (g *genericScheduler) podFitsOnNode(
|
||||
ctx context.Context,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
meta predicates.Metadata,
|
||||
info *schedulernodeinfo.NodeInfo,
|
||||
alwaysCheckAllPredicates bool,
|
||||
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
|
||||
var failedPredicates []predicates.PredicateFailureReason
|
||||
var status *framework.Status
|
||||
|
||||
podsAdded := false
|
||||
// We run predicates twice in some cases. If the node has greater or equal priority
|
||||
// nominated pods, we run them when those pods are added to meta and nodeInfo.
|
||||
// nominated pods, we run them when those pods are added to PreFilter state and nodeInfo.
|
||||
// If all predicates succeed in this pass, we run them again when these
|
||||
// nominated pods are not added. This second pass is necessary because some
|
||||
// predicates such as inter-pod affinity may not pass without the nominated pods.
|
||||
@ -672,7 +635,7 @@ func (g *genericScheduler) podFitsOnNode(
|
||||
nodeInfoToUse := info
|
||||
if i == 0 {
|
||||
var err error
|
||||
podsAdded, _, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, meta, state, info)
|
||||
podsAdded, stateToUse, nodeInfoToUse, err = g.addNominatedPods(ctx, pod, state, info)
|
||||
if err != nil {
|
||||
return false, []predicates.PredicateFailureReason{}, nil, err
|
||||
}
|
||||
@ -929,21 +892,14 @@ func (g *genericScheduler) selectNodesForPreemption(
|
||||
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
|
||||
var resultLock sync.Mutex
|
||||
|
||||
// We can use the same metadata producer for all nodes.
|
||||
meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot)
|
||||
checkNode := func(i int) {
|
||||
nodeName := potentialNodes[i].Name
|
||||
if g.nodeInfoSnapshot.NodeInfoMap[nodeName] == nil {
|
||||
return
|
||||
}
|
||||
nodeInfoCopy := g.nodeInfoSnapshot.NodeInfoMap[nodeName].Clone()
|
||||
var metaCopy predicates.Metadata
|
||||
if meta != nil {
|
||||
metaCopy = meta.ShallowCopy()
|
||||
}
|
||||
stateCopy := state.Clone()
|
||||
stateCopy.Write(migration.PredicatesStateKey, &migration.PredicatesStateData{Reference: metaCopy})
|
||||
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, metaCopy, nodeInfoCopy, pdbs)
|
||||
pods, numPDBViolations, fits := g.selectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
|
||||
if fits {
|
||||
resultLock.Lock()
|
||||
victims := extenderv1.Victims{
|
||||
@ -1016,7 +972,6 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
ctx context.Context,
|
||||
state *framework.CycleState,
|
||||
pod *v1.Pod,
|
||||
meta predicates.Metadata,
|
||||
nodeInfo *schedulernodeinfo.NodeInfo,
|
||||
pdbs []*policy.PodDisruptionBudget,
|
||||
) ([]*v1.Pod, int, bool) {
|
||||
@ -1026,11 +981,6 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
if err := nodeInfo.RemovePod(rp); err != nil {
|
||||
return err
|
||||
}
|
||||
if meta != nil {
|
||||
if err := meta.RemovePod(rp, nodeInfo.Node()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
status := g.framework.RunPreFilterExtensionRemovePod(ctx, state, pod, rp, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
return status.AsError()
|
||||
@ -1039,11 +989,6 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
}
|
||||
addPod := func(ap *v1.Pod) error {
|
||||
nodeInfo.AddPod(ap)
|
||||
if meta != nil {
|
||||
if err := meta.AddPod(ap, nodeInfo.Node()); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
status := g.framework.RunPreFilterExtensionAddPod(ctx, state, pod, ap, nodeInfo)
|
||||
if !status.IsSuccess() {
|
||||
return status.AsError()
|
||||
@ -1067,7 +1012,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
// inter-pod affinity to one or more victims, but we have decided not to
|
||||
// support this case for performance reasons. Having affinity to lower
|
||||
// priority pods is not a recommended configuration anyway.
|
||||
if fits, _, _, err := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, false); !fits {
|
||||
if fits, _, _, err := g.podFitsOnNode(ctx, state, pod, nodeInfo); !fits {
|
||||
if err != nil {
|
||||
klog.Warningf("Encountered error while selecting victims on node %v: %v", nodeInfo.Node().Name, err)
|
||||
}
|
||||
@ -1085,7 +1030,7 @@ func (g *genericScheduler) selectVictimsOnNode(
|
||||
if err := addPod(p); err != nil {
|
||||
return false, err
|
||||
}
|
||||
fits, _, _, _ := g.podFitsOnNode(ctx, state, pod, meta, nodeInfo, false)
|
||||
fits, _, _, _ := g.podFitsOnNode(ctx, state, pod, nodeInfo)
|
||||
if !fits {
|
||||
if err := removePod(p); err != nil {
|
||||
return false, err
|
||||
@ -1194,7 +1139,6 @@ func NewGenericScheduler(
|
||||
cache internalcache.Cache,
|
||||
podQueue internalqueue.SchedulingQueue,
|
||||
predicates map[string]predicates.FitPredicate,
|
||||
predicateMetaProducer predicates.MetadataProducer,
|
||||
priorityMetaProducer priorities.MetadataProducer,
|
||||
nodeInfoSnapshot *nodeinfosnapshot.Snapshot,
|
||||
framework framework.Framework,
|
||||
@ -1210,7 +1154,6 @@ func NewGenericScheduler(
|
||||
cache: cache,
|
||||
schedulingQueue: podQueue,
|
||||
predicates: predicates,
|
||||
predicateMetaProducer: predicateMetaProducer,
|
||||
priorityMetaProducer: priorityMetaProducer,
|
||||
framework: framework,
|
||||
extenders: extenders,
|
||||
|
@ -789,7 +789,6 @@ func TestGenericScheduler(t *testing.T) {
|
||||
cache,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
nil,
|
||||
algorithmpredicates.EmptyMetadataProducer,
|
||||
// test.prioritizers,
|
||||
priorities.EmptyMetadataProducer,
|
||||
snapshot,
|
||||
@ -837,7 +836,6 @@ func makeScheduler(nodes []*v1.Node, fns ...st.RegisterPluginFunc) *genericSched
|
||||
cache,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
nil,
|
||||
algorithmpredicates.EmptyMetadataProducer,
|
||||
priorities.EmptyMetadataProducer,
|
||||
emptySnapshot,
|
||||
fwk,
|
||||
@ -966,7 +964,6 @@ func TestFindFitPredicateCallCounts(t *testing.T) {
|
||||
cache,
|
||||
queue,
|
||||
nil,
|
||||
algorithmpredicates.EmptyMetadataProducer,
|
||||
priorities.EmptyMetadataProducer,
|
||||
emptySnapshot,
|
||||
fwk,
|
||||
@ -1123,7 +1120,7 @@ func TestZeroRequest(t *testing.T) {
|
||||
|
||||
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, test.nodes))
|
||||
|
||||
metaDataProducer := priorities.NewMetadataFactory(
|
||||
metadataProducer := priorities.NewMetadataFactory(
|
||||
informerFactory.Core().V1().Services().Lister(),
|
||||
informerFactory.Core().V1().ReplicationControllers().Lister(),
|
||||
informerFactory.Apps().V1().ReplicaSets().Lister(),
|
||||
@ -1131,7 +1128,7 @@ func TestZeroRequest(t *testing.T) {
|
||||
1,
|
||||
)
|
||||
|
||||
metaData := metaDataProducer(test.pod, test.nodes, snapshot)
|
||||
metadata := metadataProducer(test.pod, test.nodes, snapshot)
|
||||
|
||||
registry := framework.Registry{}
|
||||
plugins := &schedulerapi.Plugins{
|
||||
@ -1163,8 +1160,7 @@ func TestZeroRequest(t *testing.T) {
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
nil,
|
||||
metaDataProducer,
|
||||
metadataProducer,
|
||||
emptySnapshot,
|
||||
fwk,
|
||||
[]algorithm.SchedulerExtender{},
|
||||
@ -1181,7 +1177,7 @@ func TestZeroRequest(t *testing.T) {
|
||||
context.Background(),
|
||||
framework.NewCycleState(),
|
||||
test.pod,
|
||||
metaData,
|
||||
metadata,
|
||||
test.nodes,
|
||||
)
|
||||
if err != nil {
|
||||
@ -1607,12 +1603,10 @@ func TestSelectNodesForPreemption(t *testing.T) {
|
||||
snapshot := nodeinfosnapshot.NewSnapshot(nodeinfosnapshot.CreateNodeInfoMap(test.pods, nodes))
|
||||
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
factory := &algorithmpredicates.MetadataProducerFactory{}
|
||||
scheduler := NewGenericScheduler(
|
||||
nil,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
nil,
|
||||
factory.GetPredicateMetadata,
|
||||
priorities.EmptyMetadataProducer,
|
||||
snapshot,
|
||||
fwk,
|
||||
@ -1863,11 +1857,9 @@ func TestPickOneNodeForPreemption(t *testing.T) {
|
||||
test.registerFilterPlugin(®istry, plugins, pluginConfigs)
|
||||
fwk, _ := framework.NewFramework(registry, plugins, pluginConfigs, framework.WithSnapshotSharedLister(snapshot))
|
||||
|
||||
factory := algorithmpredicates.MetadataProducerFactory{}
|
||||
g := &genericScheduler{
|
||||
framework: fwk,
|
||||
nodeInfoSnapshot: snapshot,
|
||||
predicateMetaProducer: factory.GetPredicateMetadata,
|
||||
framework: fwk,
|
||||
nodeInfoSnapshot: snapshot,
|
||||
}
|
||||
assignDefaultStartTime(test.pods)
|
||||
|
||||
@ -2363,7 +2355,6 @@ func TestPreempt(t *testing.T) {
|
||||
cache,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
nil,
|
||||
algorithmpredicates.EmptyMetadataProducer,
|
||||
priorities.EmptyMetadataProducer,
|
||||
snapshot,
|
||||
fwk,
|
||||
|
@ -240,11 +240,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
predicateMetaProducer, err := getPredicateMetadataProducer(c.algorithmFactoryArgs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Combine all framework configurations. If this results in any duplication, framework
|
||||
// instantiation should fail.
|
||||
var plugins schedulerapi.Plugins
|
||||
@ -287,7 +282,6 @@ func (c *Configurator) CreateFromKeys(predicateKeys, priorityKeys sets.String, e
|
||||
c.schedulerCache,
|
||||
podQueue,
|
||||
predicateFuncs,
|
||||
predicateMetaProducer,
|
||||
priorityMetaProducer,
|
||||
c.nodeInfoSnapshot,
|
||||
framework,
|
||||
|
@ -23,9 +23,6 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// PredicatesStateKey is the key in CycleState to PredicateStateData
|
||||
PredicatesStateKey = "predicates"
|
||||
|
||||
// PrioritiesStateKey is the key in CycleState to PrioritiesStateData
|
||||
PrioritiesStateKey = "priorities"
|
||||
)
|
||||
@ -62,26 +59,6 @@ func ErrorToFrameworkStatus(err error) *framework.Status {
|
||||
return nil
|
||||
}
|
||||
|
||||
// PredicatesStateData is a pointer to Metadata. In the normal case, StateData is supposed to
|
||||
// be generated and stored in CycleState by a framework plugin (like a PreFilter pre-computing data for
|
||||
// its corresponding Filter). However, during migration, the scheduler will inject a pointer to
|
||||
// Metadata into CycleState. This "hack" is necessary because during migration Filters that implement
|
||||
// predicates functionality will be calling into the existing predicate functions, and need
|
||||
// to pass Metadata.
|
||||
type PredicatesStateData struct {
|
||||
Reference interface{}
|
||||
}
|
||||
|
||||
// Clone is supposed to make a copy of the data, but since this is just a pointer, we are practically
|
||||
// just copying the pointer. This is ok because the actual reference to the Metadata
|
||||
// copy that is made by generic_scheduler during preemption cycle will be injected again outside
|
||||
// the framework.
|
||||
func (p *PredicatesStateData) Clone() framework.StateData {
|
||||
return &PredicatesStateData{
|
||||
Reference: p.Reference,
|
||||
}
|
||||
}
|
||||
|
||||
// PrioritiesStateData is a pointer to PrioritiesMetadata.
|
||||
type PrioritiesStateData struct {
|
||||
Reference interface{}
|
||||
@ -109,28 +86,3 @@ func PriorityMetadata(state *framework.CycleState) interface{} {
|
||||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
// PredicateMetadata returns predicate metadata stored in CycleState.
|
||||
func PredicateMetadata(state *framework.CycleState) interface{} {
|
||||
if state == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var meta interface{}
|
||||
if s, err := state.Read(PredicatesStateKey); err == nil {
|
||||
meta = s.(*PredicatesStateData).Reference
|
||||
} else {
|
||||
klog.Errorf("reading key %q from CycleState, continuing without metadata: %v", PredicatesStateKey, err)
|
||||
}
|
||||
return meta
|
||||
}
|
||||
|
||||
// CovertStateRefToPredMeta checks if 'stateRef' is nil, if it is, return nil;
|
||||
// otherwise covert it to predicates metadata and return.
|
||||
func CovertStateRefToPredMeta(stateRef interface{}) (predicates.Metadata, bool) {
|
||||
if stateRef == nil {
|
||||
return nil, true
|
||||
}
|
||||
meta, ok := stateRef.(predicates.Metadata)
|
||||
return meta, ok
|
||||
}
|
||||
|
@ -201,7 +201,7 @@ func getPreFilterState(cycleState *framework.CycleState) (*preFilterState, error
|
||||
if err != nil {
|
||||
// The metadata wasn't pre-computed in prefilter. We ignore the error for now since
|
||||
// Filter is able to handle that by computing it again.
|
||||
klog.Error(fmt.Sprintf("reading %q from cycleState: %v", preFilterStateKey, err))
|
||||
klog.V(5).Infof(fmt.Sprintf("reading %q from cycleState: %v", preFilterStateKey, err))
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
|
@ -156,10 +156,6 @@ type mockScheduler struct {
|
||||
err error
|
||||
}
|
||||
|
||||
func (es mockScheduler) PredicateMetadataProducer() predicates.MetadataProducer {
|
||||
return nil
|
||||
|
||||
}
|
||||
func (es mockScheduler) Schedule(ctx context.Context, state *framework.CycleState, pod *v1.Pod) (core.ScheduleResult, error) {
|
||||
return es.result, es.err
|
||||
}
|
||||
@ -691,7 +687,6 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
|
||||
scache,
|
||||
internalqueue.NewSchedulingQueue(nil),
|
||||
nil,
|
||||
predicates.EmptyMetadataProducer,
|
||||
priorities.EmptyMetadataProducer,
|
||||
nodeinfosnapshot.NewEmptySnapshot(),
|
||||
fwk,
|
||||
@ -749,7 +744,6 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
||||
scache,
|
||||
queue,
|
||||
nil,
|
||||
predicates.EmptyMetadataProducer,
|
||||
priorities.EmptyMetadataProducer,
|
||||
nodeinfosnapshot.NewEmptySnapshot(),
|
||||
fwk,
|
||||
|
Loading…
Reference in New Issue
Block a user