Merge pull request #91037 from Huang-Wei/prefactor-PreemptExtender

Refactor preemption extender logic and move SchedulerExtender interface to framework pkg
This commit is contained in:
Kubernetes Prow Robot 2020-05-15 02:48:30 -07:00 committed by GitHub
commit 57b79e3fb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 182 additions and 171 deletions

View File

@ -38,55 +38,7 @@ const (
DefaultExtenderTimeout = 5 * time.Second
)
// SchedulerExtender is an interface for external processes to influence scheduling
// decisions made by Kubernetes. This is typically needed for resources not directly
// managed by Kubernetes.
type SchedulerExtender interface {
// Name returns a unique name that identifies the extender.
Name() string
// Filter based on extender-implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list. failedNodesMap optionally contains
// the list of failed nodes and failure reasons.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// Prioritize based on extender-implemented priority functions. The returned scores & weight
// are used to compute the weighted score for an extender. The weighted scores are added to
// the scores computed by Kubernetes scheduler. The total scores are used to do the host selection.
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
// Bind delegates the action of binding a pod to a node to the extender.
Bind(binding *v1.Binding) error
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process.
// 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled.
// The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister) (map[*v1.Node]*extenderv1.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not.
SupportsPreemption() bool
// IsIgnorable returns true indicates scheduling should not fail when this extender
// is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
IsIgnorable() bool
}
// HTTPExtender implements the SchedulerExtender interface.
// HTTPExtender implements the Extender interface.
type HTTPExtender struct {
extenderURL string
preemptVerb string
@ -131,7 +83,7 @@ func makeTransport(config *schedulerapi.Extender) (http.RoundTripper, error) {
}
// NewHTTPExtender creates an HTTPExtender object.
func NewHTTPExtender(config *schedulerapi.Extender) (SchedulerExtender, error) {
func NewHTTPExtender(config *schedulerapi.Extender) (framework.Extender, error) {
if config.HTTPTimeout.Nanoseconds() == 0 {
config.HTTPTimeout = time.Duration(DefaultExtenderTimeout)
}
@ -212,9 +164,9 @@ func (h *HTTPExtender) SupportsPreemption() bool {
// ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender.
func (h *HTTPExtender) ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims,
nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) {
) (map[string]*extenderv1.Victims, error) {
var (
result extenderv1.ExtenderPreemptionResult
args *extenderv1.ExtenderPreemptionArgs
@ -226,13 +178,12 @@ func (h *HTTPExtender) ProcessPreemption(
if h.nodeCacheCapable {
// If extender has cached node info, pass NodeNameToMetaVictims in args.
nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims)
nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeNameToVictims)
args = &extenderv1.ExtenderPreemptionArgs{
Pod: pod,
NodeNameToMetaVictims: nodeNameToMetaVictims,
}
} else {
nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims)
args = &extenderv1.ExtenderPreemptionArgs{
Pod: pod,
NodeNameToVictims: nodeNameToVictims,
@ -244,22 +195,22 @@ func (h *HTTPExtender) ProcessPreemption(
}
// Extender will always return NodeNameToMetaVictims.
// So let's convert it to NodeToVictims by using NodeNameToInfo.
newNodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeInfos)
// So let's convert it to NodeNameToVictims by using <nodeInfos>.
newNodeNameToVictims, err := h.convertToNodeNameToVictims(result.NodeNameToMetaVictims, nodeInfos)
if err != nil {
return nil, err
}
// Do not override nodeToVictims
return newNodeToVictims, nil
// Do not override <nodeNameToVictims>.
return newNodeNameToVictims, nil
}
// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers,
// convertToNodeNameToVictims converts "nodeNameToMetaVictims" from object identifiers,
// such as UIDs and names, to object pointers.
func (h *HTTPExtender) convertToNodeToVictims(
func (h *HTTPExtender) convertToNodeNameToVictims(
nodeNameToMetaVictims map[string]*extenderv1.MetaVictims,
nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
) (map[string]*extenderv1.Victims, error) {
nodeNameToVictims := map[string]*extenderv1.Victims{}
for nodeName, metaVictims := range nodeNameToMetaVictims {
nodeInfo, err := nodeInfos.Get(nodeName)
if err != nil {
@ -275,9 +226,9 @@ func (h *HTTPExtender) convertToNodeToVictims(
}
victims.Pods = append(victims.Pods, pod)
}
nodeToVictims[nodeInfo.Node()] = victims
nodeNameToVictims[nodeName] = victims
}
return nodeToVictims, nil
return nodeNameToVictims, nil
}
// convertPodUIDToPod returns v1.Pod object for given MetaPod and node info.
@ -298,10 +249,10 @@ func (h *HTTPExtender) convertPodUIDToPod(
// convertToNodeNameToMetaVictims converts from struct type to meta types.
func convertToNodeNameToMetaVictims(
nodeToVictims map[*v1.Node]*extenderv1.Victims,
nodeNameToVictims map[string]*extenderv1.Victims,
) map[string]*extenderv1.MetaVictims {
nodeNameToVictims := map[string]*extenderv1.MetaVictims{}
for node, victims := range nodeToVictims {
nodeNameToMetaVictims := map[string]*extenderv1.MetaVictims{}
for node, victims := range nodeNameToVictims {
metaVictims := &extenderv1.MetaVictims{
Pods: []*extenderv1.MetaPod{},
}
@ -311,20 +262,9 @@ func convertToNodeNameToMetaVictims(
}
metaVictims.Pods = append(metaVictims.Pods, metaPod)
}
nodeNameToVictims[node.GetName()] = metaVictims
nodeNameToMetaVictims[node] = metaVictims
}
return nodeNameToVictims
}
// convertToNodeNameToVictims converts from node type to node name as key.
func convertToNodeNameToVictims(
nodeToVictims map[*v1.Node]*extenderv1.Victims,
) map[string]*extenderv1.Victims {
nodeNameToVictims := map[string]*extenderv1.Victims{}
for node, victims := range nodeToVictims {
nodeNameToVictims[node.GetName()] = victims
}
return nodeNameToVictims
return nodeNameToMetaVictims
}
// Filter based on extender implemented predicate functions. The filtered list is

View File

@ -159,36 +159,37 @@ func (f *FakeExtender) SupportsPreemption() bool {
func (f *FakeExtender) ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims,
nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) {
nodeToVictimsCopy := map[*v1.Node]*extenderv1.Victims{}
// We don't want to change the original nodeToVictims
for k, v := range nodeToVictims {
) (map[string]*extenderv1.Victims, error) {
nodeNameToVictimsCopy := map[string]*extenderv1.Victims{}
// We don't want to change the original nodeNameToVictims
for k, v := range nodeNameToVictims {
// In real world implementation, extender's user should have their own way to get node object
// by name if needed (e.g. query kube-apiserver etc).
//
// For test purpose, we just use node from parameters directly.
nodeToVictimsCopy[k] = v
nodeNameToVictimsCopy[k] = v
}
for node, victims := range nodeToVictimsCopy {
for nodeName, victims := range nodeNameToVictimsCopy {
// Try to do preemption on extender side.
extenderVictimPods, extendernPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, node)
nodeInfo, _ := nodeInfos.Get(nodeName)
extenderVictimPods, extenderPDBViolations, fits, err := f.selectVictimsOnNodeByExtender(pod, nodeInfo.Node())
if err != nil {
return nil, err
}
// If it's unfit after extender's preemption, this node is unresolvable by preemption overall,
// let's remove it from potential preemption nodes.
if !fits {
delete(nodeToVictimsCopy, node)
delete(nodeNameToVictimsCopy, nodeName)
} else {
// Append new victims to original victims
nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...)
nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + int64(extendernPDBViolations)
nodeNameToVictimsCopy[nodeName].Pods = append(victims.Pods, extenderVictimPods...)
nodeNameToVictimsCopy[nodeName].NumPDBViolations = victims.NumPDBViolations + int64(extenderPDBViolations)
}
}
return nodeToVictimsCopy, nil
return nodeNameToVictimsCopy, nil
}
// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side.
@ -352,7 +353,7 @@ func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
return !f.unInterested
}
var _ SchedulerExtender = &FakeExtender{}
var _ framework.Extender = &FakeExtender{}
func TestGenericSchedulerWithExtenders(t *testing.T) {
tests := []struct {
@ -574,7 +575,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
client := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, 0)
extenders := []SchedulerExtender{}
extenders := []framework.Extender{}
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}

View File

@ -105,10 +105,10 @@ type ScheduleAlgorithm interface {
// the pod by preempting lower priority pods if possible.
// It returns the node where preemption happened, a list of preempted pods, a
// list of pods whose nominated node name should be removed, and error if any.
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode *v1.Node, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
Preempt(context.Context, *profile.Profile, *framework.CycleState, *v1.Pod, error) (selectedNode string, preemptedPods []*v1.Pod, cleanupNominatedPods []*v1.Pod, err error)
// Extenders returns a slice of extender config. This is exposed for
// testing.
Extenders() []SchedulerExtender
Extenders() []framework.Extender
}
// ScheduleResult represents the result of one pod scheduled. It will contain
@ -125,7 +125,7 @@ type ScheduleResult struct {
type genericScheduler struct {
cache internalcache.Cache
schedulingQueue internalqueue.SchedulingQueue
extenders []SchedulerExtender
extenders []framework.Extender
nodeInfoSnapshot *internalcache.Snapshot
pvcLister corelisters.PersistentVolumeClaimLister
pdbLister policylisters.PodDisruptionBudgetLister
@ -210,7 +210,7 @@ func (g *genericScheduler) Schedule(ctx context.Context, prof *profile.Profile,
}, err
}
func (g *genericScheduler) Extenders() []SchedulerExtender {
func (g *genericScheduler) Extenders() []framework.Extender {
return g.extenders
}
@ -251,74 +251,74 @@ func (g *genericScheduler) selectHost(nodeScoreList framework.NodeScoreList) (st
// other pods with the same priority. The nominated pod prevents other pods from
// using the nominated resources and the nominated pod could take a long time
// before it is retried after many other pending pods.
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
func (g *genericScheduler) Preempt(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
// Scheduler may return various types of errors. Consider preemption only if
// the error is of type FitError.
fitError, ok := scheduleErr.(*FitError)
if !ok || fitError == nil {
return nil, nil, nil, nil
return "", nil, nil, nil
}
if !podEligibleToPreemptOthers(pod, g.nodeInfoSnapshot.NodeInfos()) {
klog.V(5).Infof("Pod %v/%v is not eligible for more preemption.", pod.Namespace, pod.Name)
return nil, nil, nil, nil
return "", nil, nil, nil
}
allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()
if err != nil {
return nil, nil, nil, err
return "", nil, nil, err
}
if len(allNodes) == 0 {
return nil, nil, nil, ErrNoNodesAvailable
return "", nil, nil, ErrNoNodesAvailable
}
potentialNodes := nodesWherePreemptionMightHelp(allNodes, fitError)
if len(potentialNodes) == 0 {
klog.V(3).Infof("Preemption will not help schedule pod %v/%v on any node.", pod.Namespace, pod.Name)
// In this case, we should clean-up any existing nominated node name of the pod.
return nil, nil, []*v1.Pod{pod}, nil
return "", nil, []*v1.Pod{pod}, nil
}
var pdbs []*policy.PodDisruptionBudget
if g.pdbLister != nil {
pdbs, err = g.pdbLister.List(labels.Everything())
if err != nil {
return nil, nil, nil, err
return "", nil, nil, err
}
}
nodeToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
nodeNameToVictims, err := g.selectNodesForPreemption(ctx, prof, state, pod, potentialNodes, pdbs)
if err != nil {
return nil, nil, nil, err
return "", nil, nil, err
}
// We will only check nodeToVictims with extenders that support preemption.
// We will only check nodeNameToVictims with extenders that support preemption.
// Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated
// node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles.
nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims)
nodeNameToVictims, err = g.processPreemptionWithExtenders(pod, nodeNameToVictims)
if err != nil {
return nil, nil, nil, err
return "", nil, nil, err
}
candidateNode := pickOneNodeForPreemption(nodeToVictims)
if candidateNode == nil {
return nil, nil, nil, nil
candidateNode := pickOneNodeForPreemption(nodeNameToVictims)
if len(candidateNode) == 0 {
return "", nil, nil, nil
}
// Lower priority pods nominated to run on this node, may no longer fit on
// this node. So, we should remove their nomination. Removing their
// nomination updates these pods and moves them to the active queue. It
// lets scheduler find another place for them.
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name)
return candidateNode, nodeToVictims[candidateNode].Pods, nominatedPods, nil
nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode)
return candidateNode, nodeNameToVictims[candidateNode].Pods, nominatedPods, nil
}
// processPreemptionWithExtenders processes preemption with extenders
func (g *genericScheduler) processPreemptionWithExtenders(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims,
) (map[*v1.Node]*extenderv1.Victims, error) {
if len(nodeToVictims) > 0 {
nodeNameToVictims map[string]*extenderv1.Victims,
) (map[string]*extenderv1.Victims, error) {
if len(nodeNameToVictims) > 0 {
for _, extender := range g.extenders {
if extender.SupportsPreemption() && extender.IsInterested(pod) {
newNodeToVictims, err := extender.ProcessPreemption(
newNodeNameToVictims, err := extender.ProcessPreemption(
pod,
nodeToVictims,
nodeNameToVictims,
g.nodeInfoSnapshot.NodeInfos(),
)
if err != nil {
@ -330,19 +330,19 @@ func (g *genericScheduler) processPreemptionWithExtenders(
return nil, err
}
// Replace nodeToVictims with new result after preemption. So the
// Replace nodeNameToVictims with new result after preemption. So the
// rest of extenders can continue use it as parameter.
nodeToVictims = newNodeToVictims
nodeNameToVictims = newNodeNameToVictims
// If node list becomes empty, no preemption can happen regardless of other extenders.
if len(nodeToVictims) == 0 {
if len(nodeNameToVictims) == 0 {
break
}
}
}
}
return nodeToVictims, nil
return nodeNameToVictims, nil
}
// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
@ -719,12 +719,12 @@ func (g *genericScheduler) prioritizeNodes(
// 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *v1.Node {
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {
if len(nodesToVictims) == 0 {
return nil
return ""
}
minNumPDBViolatingPods := int64(math.MaxInt32)
var minNodes1 []*v1.Node
var minNodes1 []string
lenNodes1 := 0
for node, victims := range nodesToVictims {
if len(victims.Pods) == 0 {
@ -752,7 +752,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*extenderv1.Victims) *
// There are more than one node with minimum number PDB violating pods. Find
// the one with minimum highest priority victim.
minHighestPriority := int32(math.MaxInt32)
var minNodes2 = make([]*v1.Node, lenNodes1)
var minNodes2 = make([]string, lenNodes1)
lenNodes2 := 0
for i := 0; i < lenNodes1; i++ {
node := minNodes1[i]
@ -855,8 +855,8 @@ func (g *genericScheduler) selectNodesForPreemption(
pod *v1.Pod,
potentialNodes []*framework.NodeInfo,
pdbs []*policy.PodDisruptionBudget,
) (map[*v1.Node]*extenderv1.Victims, error) {
nodeToVictims := map[*v1.Node]*extenderv1.Victims{}
) (map[string]*extenderv1.Victims, error) {
nodeNameToVictims := map[string]*extenderv1.Victims{}
var resultLock sync.Mutex
checkNode := func(i int) {
@ -869,12 +869,12 @@ func (g *genericScheduler) selectNodesForPreemption(
Pods: pods,
NumPDBViolations: int64(numPDBViolations),
}
nodeToVictims[potentialNodes[i].Node()] = &victims
nodeNameToVictims[potentialNodes[i].Node().Name] = &victims
resultLock.Unlock()
}
}
parallelize.Until(ctx, len(potentialNodes), checkNode)
return nodeToVictims, nil
return nodeNameToVictims, nil
}
// filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods"
@ -1103,7 +1103,7 @@ func NewGenericScheduler(
cache internalcache.Cache,
podQueue internalqueue.SchedulingQueue,
nodeInfoSnapshot *internalcache.Snapshot,
extenders []SchedulerExtender,
extenders []framework.Extender,
pvcLister corelisters.PersistentVolumeClaimLister,
pdbLister policylisters.PodDisruptionBudgetLister,
disablePreemption bool,

View File

@ -813,7 +813,7 @@ func TestGenericScheduler(t *testing.T) {
cache,
internalqueue.NewSchedulingQueue(nil),
snapshot,
[]SchedulerExtender{},
[]framework.Extender{},
pvcLister,
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
@ -1134,7 +1134,7 @@ func TestZeroRequest(t *testing.T) {
nil,
nil,
emptySnapshot,
[]SchedulerExtender{},
[]framework.Extender{},
nil,
nil,
false,
@ -1161,10 +1161,10 @@ func TestZeroRequest(t *testing.T) {
}
}
func printNodeToVictims(nodeToVictims map[*v1.Node]*extenderv1.Victims) string {
func printNodeNameToVictims(nodeNameToVictims map[string]*extenderv1.Victims) string {
var output string
for node, victims := range nodeToVictims {
output += node.Name + ": ["
for nodeName, victims := range nodeNameToVictims {
output += nodeName + ": ["
for _, pod := range victims.Pods {
output += pod.Name + ", "
}
@ -1178,12 +1178,12 @@ type victims struct {
numPDBViolations int64
}
func checkPreemptionVictims(expected map[string]victims, nodeToPods map[*v1.Node]*extenderv1.Victims) error {
func checkPreemptionVictims(expected map[string]victims, nodeToPods map[string]*extenderv1.Victims) error {
if len(expected) == len(nodeToPods) {
for k, victims := range nodeToPods {
if expVictims, ok := expected[k.Name]; ok {
if expVictims, ok := expected[k]; ok {
if len(victims.Pods) != len(expVictims.pods) {
return fmt.Errorf("unexpected number of pods. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods))
return fmt.Errorf("unexpected number of pods. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods))
}
prevPriority := int32(math.MaxInt32)
for _, p := range victims.Pods {
@ -1200,11 +1200,11 @@ func checkPreemptionVictims(expected map[string]victims, nodeToPods map[*v1.Node
return fmt.Errorf("unexpected numPDBViolations. expected: %d, got: %d", expVictims.numPDBViolations, victims.NumPDBViolations)
}
} else {
return fmt.Errorf("unexpected machines. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods))
return fmt.Errorf("unexpected machines. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods))
}
}
} else {
return fmt.Errorf("unexpected number of machines. expected: %v, got: %v", expected, printNodeToVictims(nodeToPods))
return fmt.Errorf("unexpected number of machines. expected: %v, got: %v", expected, printNodeNameToVictims(nodeToPods))
}
return nil
}
@ -1613,7 +1613,7 @@ func TestSelectNodesForPreemption(t *testing.T) {
nil,
internalqueue.NewSchedulingQueue(nil),
snapshot,
[]SchedulerExtender{},
[]framework.Extender{},
nil,
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
@ -1916,7 +1916,7 @@ func TestPickOneNodeForPreemption(t *testing.T) {
node := pickOneNodeForPreemption(candidateNodes)
found := false
for _, nodeName := range test.expected {
if node.Name == nodeName {
if node == nodeName {
found = true
break
}
@ -2391,7 +2391,7 @@ func TestPreempt(t *testing.T) {
cachedNodeInfo.SetNode(node)
cachedNodeInfoMap[node.Name] = cachedNodeInfo
}
var extenders []SchedulerExtender
var extenders []framework.Extender
for _, extender := range test.extenders {
// Set nodeInfoMap as extenders cached node information.
extender.cachedNodeNameToInfo = cachedNodeInfoMap
@ -2429,10 +2429,10 @@ func TestPreempt(t *testing.T) {
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
if node != nil && node.Name != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, node.GetName())
if len(node) != 0 && node != test.expectedNode {
t.Errorf("expected node: %v, got: %v", test.expectedNode, node)
}
if node == nil && len(test.expectedNode) != 0 {
if len(node) == 0 && len(test.expectedNode) != 0 {
t.Errorf("expected node: %v, got: nothing", test.expectedNode)
}
if len(victims) != len(test.expectedPods) {
@ -2452,14 +2452,14 @@ func TestPreempt(t *testing.T) {
// Mark the victims for deletion and record the preemptor's nominated node name.
now := metav1.Now()
victim.DeletionTimestamp = &now
test.pod.Status.NominatedNodeName = node.Name
test.pod.Status.NominatedNodeName = node
}
// Call preempt again and make sure it doesn't preempt any more pods.
node, victims, _, err = scheduler.Preempt(context.Background(), prof, state, test.pod, error(&FitError{Pod: test.pod, FilteredNodesStatuses: failedNodeToStatusMap}))
if err != nil {
t.Errorf("unexpected error in preemption: %v", err)
}
if node != nil && len(victims) > 0 {
if len(node) != 0 && len(victims) > 0 {
t.Errorf("didn't expect any more preemption. Node %v is selected for preemption.", node)
}
close(stop)

View File

@ -122,10 +122,10 @@ func (c *Configurator) buildFramework(p schedulerapi.KubeSchedulerProfile) (fram
// create a scheduler from a set of registered plugins.
func (c *Configurator) create() (*Scheduler, error) {
var extenders []core.SchedulerExtender
var extenders []framework.Extender
var ignoredExtendedResources []string
if len(c.extenders) != 0 {
var ignorableExtenders []core.SchedulerExtender
var ignorableExtenders []framework.Extender
for ii := range c.extenders {
klog.V(2).Infof("Creating extender with config %+v", c.extenders[ii])
extender, err := core.NewHTTPExtender(&c.extenders[ii])

View File

@ -509,10 +509,10 @@ func (f *fakeExtender) IsIgnorable() bool {
}
func (f *fakeExtender) ProcessPreemption(
pod *v1.Pod,
nodeToVictims map[*v1.Node]*extenderv1.Victims,
nodeInfos framework.NodeInfoLister,
) (map[*v1.Node]*extenderv1.Victims, error) {
_ *v1.Pod,
_ map[string]*extenderv1.Victims,
_ framework.NodeInfoLister,
) (map[string]*extenderv1.Victims, error) {
return nil, nil
}
@ -525,8 +525,8 @@ func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v
}
func (f *fakeExtender) Prioritize(
pod *v1.Pod,
nodes []*v1.Node,
_ *v1.Pod,
_ []*v1.Node,
) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error) {
return nil, 0, nil
}

View File

@ -4,6 +4,7 @@ go_library(
name = "go_default_library",
srcs = [
"cycle_state.go",
"extender.go",
"framework.go",
"interface.go",
"listers.go",
@ -34,6 +35,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/component-base/metrics:go_default_library",
"//staging/src/k8s.io/kube-scheduler/config/v1alpha2:go_default_library",
"//staging/src/k8s.io/kube-scheduler/extender/v1:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],

View File

@ -0,0 +1,70 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
v1 "k8s.io/api/core/v1"
extenderv1 "k8s.io/kube-scheduler/extender/v1"
)
// Extender is an interface for external processes to influence scheduling
// decisions made by Kubernetes. This is typically needed for resources not directly
// managed by Kubernetes.
type Extender interface {
// Name returns a unique name that identifies the extender.
Name() string
// Filter based on extender-implemented predicate functions. The filtered list is
// expected to be a subset of the supplied list. failedNodesMap optionally contains
// the list of failed nodes and failure reasons.
Filter(pod *v1.Pod, nodes []*v1.Node) (filteredNodes []*v1.Node, failedNodesMap extenderv1.FailedNodesMap, err error)
// Prioritize based on extender-implemented priority functions. The returned scores & weight
// are used to compute the weighted score for an extender. The weighted scores are added to
// the scores computed by Kubernetes scheduler. The total scores are used to do the host selection.
Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *extenderv1.HostPriorityList, weight int64, err error)
// Bind delegates the action of binding a pod to a node to the extender.
Bind(binding *v1.Binding) error
// IsBinder returns whether this extender is configured for the Bind method.
IsBinder() bool
// IsInterested returns true if at least one extended resource requested by
// this pod is managed by this extender.
IsInterested(pod *v1.Pod) bool
// ProcessPreemption returns nodes with their victim pods processed by extender based on
// given:
// 1. Pod to schedule
// 2. Candidate nodes and victim pods (nodeNameToVictims) generated by previous scheduling process.
// The possible changes made by extender may include:
// 1. Subset of given candidate nodes after preemption phase of extender.
// 2. A different set of victim pod for every given candidate node after preemption phase of extender.
ProcessPreemption(
pod *v1.Pod,
nodeNameToVictims map[string]*extenderv1.Victims,
nodeInfos NodeInfoLister,
) (map[string]*extenderv1.Victims, error)
// SupportsPreemption returns if the scheduler extender support preemption or not.
SupportsPreemption() bool
// IsIgnorable returns true indicates scheduling should not fail when this extender
// is unavailable. This gives scheduler ability to fail fast and tolerate non-critical extenders as well.
IsIgnorable() bool
}

View File

@ -393,14 +393,12 @@ func (sched *Scheduler) preempt(ctx context.Context, prof *profile.Profile, stat
return "", err
}
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
nodeName, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, prof, state, preemptor, scheduleErr)
if err != nil {
klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err)
return "", err
}
var nodeName = ""
if node != nil {
nodeName = node.Name
if len(nodeName) != 0 {
// Update the scheduling queue with the nominated pod information. Without
// this, there would be a race condition between the next scheduling cycle
// and the time the scheduler receives a Pod Update for the nominated pod.

View File

@ -144,11 +144,11 @@ func (es mockScheduler) Schedule(ctx context.Context, profile *profile.Profile,
return es.result, es.err
}
func (es mockScheduler) Extenders() []core.SchedulerExtender {
func (es mockScheduler) Extenders() []framework.Extender {
return nil
}
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (*v1.Node, []*v1.Pod, []*v1.Pod, error) {
return nil, nil, nil, nil
func (es mockScheduler) Preempt(ctx context.Context, i *profile.Profile, state *framework.CycleState, pod *v1.Pod, scheduleErr error) (string, []*v1.Pod, []*v1.Pod, error) {
return "", nil, nil, nil
}
func TestSchedulerCreation(t *testing.T) {
@ -813,7 +813,7 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache internalcache.C
scache,
internalqueue.NewSchedulingQueue(nil),
internalcache.NewEmptySnapshot(),
[]core.SchedulerExtender{},
[]framework.Extender{},
informerFactory.Core().V1().PersistentVolumeClaims().Lister(),
informerFactory.Policy().V1beta1().PodDisruptionBudgets().Lister(),
false,
@ -1102,14 +1102,14 @@ priorities:
func TestSchedulerBinding(t *testing.T) {
table := []struct {
podName string
extenders []core.SchedulerExtender
extenders []framework.Extender
wantBinderID int
name string
}{
{
name: "the extender is not a binder",
podName: "pod0",
extenders: []core.SchedulerExtender{
extenders: []framework.Extender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
},
wantBinderID: -1, // default binding.
@ -1117,7 +1117,7 @@ func TestSchedulerBinding(t *testing.T) {
{
name: "one of the extenders is a binder and interested in pod",
podName: "pod0",
extenders: []core.SchedulerExtender{
extenders: []framework.Extender{
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},
@ -1126,7 +1126,7 @@ func TestSchedulerBinding(t *testing.T) {
{
name: "one of the extenders is a binder, but not interested in pod",
podName: "pod1",
extenders: []core.SchedulerExtender{
extenders: []framework.Extender{
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
},