From 71603f2f85870fe25b49f2aeb57dd6e6ab7ceff6 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Fri, 17 Nov 2017 10:21:03 +0800 Subject: [PATCH 1/5] Add preemption in scheduler extender Add verb and preemption for scheduler extender Update bazel Use simple preemption in extender Use node name instead of v1.Node Fix support method Fix preemption dup Remove uneeded logics Remove nodeInfo from param to extender Update bazel for scheduler types Mock extender cache with nodeInfo Add nodeInfo as extender cache Choose node name or node based on cache flag Always return meta victims in result --- ...scheduler-policy-config-with-extender.json | 1 + .../algorithm/scheduler_interface.go | 21 ++- pkg/scheduler/api/types.go | 41 ++++- pkg/scheduler/api/v1/types.go | 41 ++++- pkg/scheduler/api/v1/zz_generated.deepcopy.go | 161 +++++++++++++++++- .../api/validation/validation_test.go | 4 + pkg/scheduler/api/zz_generated.deepcopy.go | 161 +++++++++++++++++- pkg/scheduler/core/BUILD | 1 + pkg/scheduler/core/extender.go | 124 +++++++++++++- pkg/scheduler/core/extender_test.go | 146 ++++++++++++++-- pkg/scheduler/core/generic_scheduler.go | 143 ++++++++-------- pkg/scheduler/core/generic_scheduler_test.go | 21 ++- test/integration/scheduler/extender_test.go | 6 +- 13 files changed, 767 insertions(+), 104 deletions(-) diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json index cc1a9bd6fbd..fb5e1381353 100644 --- a/examples/scheduler-policy-config-with-extender.json +++ b/examples/scheduler-policy-config-with-extender.json @@ -20,6 +20,7 @@ "filterVerb": "filter", "bindVerb": "bind", "prioritizeVerb": "prioritize", + "preemptVerb": "preempt", "weight": 5, "enableHttps": false, "nodeCacheCapable": false diff --git a/pkg/scheduler/algorithm/scheduler_interface.go b/pkg/scheduler/algorithm/scheduler_interface.go index f86015b491c..61c740c60a3 100644 --- a/pkg/scheduler/algorithm/scheduler_interface.go +++ b/pkg/scheduler/algorithm/scheduler_interface.go @@ -29,7 +29,9 @@ type SchedulerExtender interface { // Filter based on extender-implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. - Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) + Filter(pod *v1.Pod, + nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo, + ) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) // Prioritize based on extender-implemented priority functions. The returned scores & weight // are used to compute the weighted score for an extender. The weighted scores are added to @@ -45,6 +47,23 @@ type SchedulerExtender interface { // IsInterested returns true if at least one extended resource requested by // this pod is managed by this extender. IsInterested(pod *v1.Pod) bool + + // ProcessPreemption returns nodes with their victim pods processed by extender based on + // given: + // 1. Pod to schedule + // 2. Candidate nodes and victim pods (nodeToVictims) generated by previous scheduling process. + // 3. nodeNameToInfo to restore v1.Node from node name if extender cache is enabled. + // The possible changes made by extender may include: + // 1. Subset of given candidate nodes after preemption phase of extender. + // 2. A different set of victim pod for every given candidate node after preemption phase of extender. + ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, + ) (map[*v1.Node]*schedulerapi.Victims, error) + + // SupportsPreemption returns if the scheduler extender support preemption or not. + SupportsPreemption() bool } // ScheduleAlgorithm is an interface implemented by things that know how to schedule pods diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index cfc8d219ec1..de985d9c822 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -168,6 +168,8 @@ type ExtenderConfig struct { URLPrefix string // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. FilterVerb string + // Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender. + PreemptVerb string // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. PrioritizeVerb string // The numeric multiplier for the node scores that the prioritize call generates. @@ -200,11 +202,48 @@ type ExtenderConfig struct { ManagedResources []ExtenderManagedResource } +// ExtenderPreemptionResult represents the result returned by preemption phase of extender. +type ExtenderPreemptionResult struct { + NodeNameToMetaVictims map[string]*MetaVictims +} + +// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes. +type ExtenderPreemptionArgs struct { + // Pod being scheduled + Pod *v1.Pod + // Victims map generated by scheduler preemption phase + // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims. + NodeToVictims map[*v1.Node]*Victims + NodeNameToMetaVictims map[string]*MetaVictims +} + +// Victims represents: +// pods: a group of pods expected to be preempted. +// numPDBViolations: the count of violations of PodDisruptionBudget +type Victims struct { + Pods []*v1.Pod + NumPDBViolations int +} + +// MetaPod represent identifier for a v1.Pod +type MetaPod struct { + UID string +} + +// MetaVictims represents: +// pods: a group of pods expected to be preempted. +// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way. +// numPDBViolations: the count of violations of PodDisruptionBudget +type MetaVictims struct { + Pods []*MetaPod + NumPDBViolations int +} + // ExtenderArgs represents the arguments needed by the extender to filter/prioritize // nodes for a pod. type ExtenderArgs struct { // Pod being scheduled - Pod v1.Pod + Pod *v1.Pod // List of candidate nodes where the pod can be scheduled; to be populated // only if ExtenderConfig.NodeCacheCapable == false Nodes *v1.NodeList diff --git a/pkg/scheduler/api/v1/types.go b/pkg/scheduler/api/v1/types.go index 32ac2579588..e1722cc9caf 100644 --- a/pkg/scheduler/api/v1/types.go +++ b/pkg/scheduler/api/v1/types.go @@ -142,6 +142,8 @@ type ExtenderConfig struct { URLPrefix string `json:"urlPrefix"` // Verb for the filter call, empty if not supported. This verb is appended to the URLPrefix when issuing the filter call to extender. FilterVerb string `json:"filterVerb,omitempty"` + // Verb for the preempt call, empty if not supported. This verb is appended to the URLPrefix when issuing the preempt call to extender. + PreemptVerb string `json:"preemptVerb,omitempty"` // Verb for the prioritize call, empty if not supported. This verb is appended to the URLPrefix when issuing the prioritize call to extender. PrioritizeVerb string `json:"prioritizeVerb,omitempty"` // The numeric multiplier for the node scores that the prioritize call generates. @@ -178,7 +180,7 @@ type ExtenderConfig struct { // nodes for a pod. type ExtenderArgs struct { // Pod being scheduled - Pod apiv1.Pod `json:"pod"` + Pod *apiv1.Pod `json:"pod"` // List of candidate nodes where the pod can be scheduled; to be populated // only if ExtenderConfig.NodeCacheCapable == false Nodes *apiv1.NodeList `json:"nodes,omitempty"` @@ -187,6 +189,43 @@ type ExtenderArgs struct { NodeNames *[]string `json:"nodenames,omitempty"` } +// ExtenderPreemptionResult represents the result returned by preemption phase of extender. +type ExtenderPreemptionResult struct { + NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"` +} + +// ExtenderPreemptionArgs represents the arguments needed by the extender to preempt pods on nodes. +type ExtenderPreemptionArgs struct { + // Pod being scheduled + Pod *apiv1.Pod `json:"pod"` + // Victims map generated by scheduler preemption phase + // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims. + NodeToVictims map[*apiv1.Node]*Victims `json:"nodeToVictims,omitempty"` + NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"` +} + +// Victims represents: +// pods: a group of pods expected to be preempted. +// numPDBViolations: the count of violations of PodDisruptionBudget +type Victims struct { + Pods []*apiv1.Pod `json:"pods"` + NumPDBViolations int `json:"numPDBViolations"` +} + +// MetaPod represent identifier for a v1.Pod +type MetaPod struct { + UID string `json:"uid"` +} + +// MetaVictims represents: +// pods: a group of pods expected to be preempted. +// Only Pod identifiers will be sent and user are expect to get v1.Pod in their own way. +// numPDBViolations: the count of violations of PodDisruptionBudget +type MetaVictims struct { + Pods []*MetaPod `json:"pods"` + NumPDBViolations int `json:"numPDBViolations"` +} + // FailedNodesMap represents the filtered out nodes, with node names and failure messages type FailedNodesMap map[string]string diff --git a/pkg/scheduler/api/v1/zz_generated.deepcopy.go b/pkg/scheduler/api/v1/zz_generated.deepcopy.go index 98f8fcbf7ef..77efc9a175a 100644 --- a/pkg/scheduler/api/v1/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/v1/zz_generated.deepcopy.go @@ -29,7 +29,15 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) { *out = *in - in.Pod.DeepCopyInto(&out.Pod) + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(core_v1.Pod) + (*in).DeepCopyInto(*out) + } + } if in.Nodes != nil { in, out := &in.Nodes, &out.Nodes if *in == nil { @@ -210,6 +218,85 @@ func (in FailedNodesMap) DeepCopy() FailedNodesMap { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { + *out = *in + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(core_v1.Pod) + (*in).DeepCopyInto(*out) + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*core_v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *core_v1.Node + } + } + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionArgs. +func (in *ExtenderPreemptionArgs) DeepCopy() *ExtenderPreemptionArgs { + if in == nil { + return nil + } + out := new(ExtenderPreemptionArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) { + *out = *in + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*core_v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *core_v1.Node + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionResult. +func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult { + if in == nil { + return nil + } + out := new(ExtenderPreemptionResult) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HostPriority) DeepCopyInto(out *HostPriority) { *out = *in @@ -283,6 +370,50 @@ func (in *LabelsPresence) DeepCopy() *LabelsPresence { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaPod) DeepCopyInto(out *MetaPod) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaPod. +func (in *MetaPod) DeepCopy() *MetaPod { + if in == nil { + return nil + } + out := new(MetaPod) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaVictims) DeepCopyInto(out *MetaVictims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*MetaPod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(MetaPod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaVictims. +func (in *MetaVictims) DeepCopy() *MetaVictims { + if in == nil { + return nil + } + out := new(MetaVictims) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Policy) DeepCopyInto(out *Policy) { *out = *in @@ -483,3 +614,31 @@ func (in *ServiceAntiAffinity) DeepCopy() *ServiceAntiAffinity { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Victims) DeepCopyInto(out *Victims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*core_v1.Pod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(core_v1.Pod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Victims. +func (in *Victims) DeepCopy() *Victims { + if in == nil { + return nil + } + out := new(Victims) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/scheduler/api/validation/validation_test.go b/pkg/scheduler/api/validation/validation_test.go index ede1b2c4216..9c3a5cd355d 100644 --- a/pkg/scheduler/api/validation/validation_test.go +++ b/pkg/scheduler/api/validation/validation_test.go @@ -61,6 +61,10 @@ func TestValidatePolicy(t *testing.T) { policy: api.Policy{ExtenderConfigs: []api.ExtenderConfig{{URLPrefix: "http://127.0.0.1:8081/extender", FilterVerb: "filter"}}}, expected: nil, }, + { + policy: api.Policy{ExtenderConfigs: []api.ExtenderConfig{{URLPrefix: "http://127.0.0.1:8081/extender", PreemptVerb: "preempt"}}}, + expected: nil, + }, { policy: api.Policy{ ExtenderConfigs: []api.ExtenderConfig{ diff --git a/pkg/scheduler/api/zz_generated.deepcopy.go b/pkg/scheduler/api/zz_generated.deepcopy.go index 1986933b93c..b765a201d48 100644 --- a/pkg/scheduler/api/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/zz_generated.deepcopy.go @@ -29,7 +29,15 @@ import ( // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExtenderArgs) DeepCopyInto(out *ExtenderArgs) { *out = *in - in.Pod.DeepCopyInto(&out.Pod) + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(v1.Pod) + (*in).DeepCopyInto(*out) + } + } if in.Nodes != nil { in, out := &in.Nodes, &out.Nodes if *in == nil { @@ -210,6 +218,85 @@ func (in FailedNodesMap) DeepCopy() FailedNodesMap { return *out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { + *out = *in + if in.Pod != nil { + in, out := &in.Pod, &out.Pod + if *in == nil { + *out = nil + } else { + *out = new(v1.Pod) + (*in).DeepCopyInto(*out) + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *v1.Node + } + } + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionArgs. +func (in *ExtenderPreemptionArgs) DeepCopy() *ExtenderPreemptionArgs { + if in == nil { + return nil + } + out := new(ExtenderPreemptionArgs) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) { + *out = *in + if in.NodeNameToMetaVictims != nil { + in, out := &in.NodeNameToMetaVictims, &out.NodeNameToMetaVictims + *out = make(map[string]*MetaVictims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(MetaVictims) + val.DeepCopyInto((*out)[key]) + } + } + } + if in.NodeToVictims != nil { + in, out := &in.NodeToVictims, &out.NodeToVictims + *out = make(map[*v1.Node]*Victims, len(*in)) + for range *in { + // FIXME: Copying unassignable keys unsupported *v1.Node + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderPreemptionResult. +func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult { + if in == nil { + return nil + } + out := new(ExtenderPreemptionResult) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HostPriority) DeepCopyInto(out *HostPriority) { *out = *in @@ -283,6 +370,50 @@ func (in *LabelsPresence) DeepCopy() *LabelsPresence { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaPod) DeepCopyInto(out *MetaPod) { + *out = *in + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaPod. +func (in *MetaPod) DeepCopy() *MetaPod { + if in == nil { + return nil + } + out := new(MetaPod) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *MetaVictims) DeepCopyInto(out *MetaVictims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*MetaPod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(MetaPod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MetaVictims. +func (in *MetaVictims) DeepCopy() *MetaVictims { + if in == nil { + return nil + } + out := new(MetaVictims) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Policy) DeepCopyInto(out *Policy) { *out = *in @@ -483,3 +614,31 @@ func (in *ServiceAntiAffinity) DeepCopy() *ServiceAntiAffinity { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Victims) DeepCopyInto(out *Victims) { + *out = *in + if in.Pods != nil { + in, out := &in.Pods, &out.Pods + *out = make([]*v1.Pod, len(*in)) + for i := range *in { + if (*in)[i] == nil { + (*out)[i] = nil + } else { + (*out)[i] = new(v1.Pod) + (*in)[i].DeepCopyInto((*out)[i]) + } + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Victims. +func (in *Victims) DeepCopy() *Victims { + if in == nil { + return nil + } + out := new(Victims) + in.DeepCopyInto(out) + return out +} diff --git a/pkg/scheduler/core/BUILD b/pkg/scheduler/core/BUILD index dd2823df3b4..8163951d7dc 100644 --- a/pkg/scheduler/core/BUILD +++ b/pkg/scheduler/core/BUILD @@ -27,6 +27,7 @@ go_test( "//vendor/k8s.io/api/apps/v1beta1:go_default_library", "//vendor/k8s.io/api/core/v1:go_default_library", "//vendor/k8s.io/api/extensions/v1beta1:go_default_library", + "//vendor/k8s.io/api/policy/v1beta1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library", diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index 022db87f5ba..14bd455cc03 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -41,6 +41,7 @@ const ( // HTTPExtender implements the algorithm.SchedulerExtender interface. type HTTPExtender struct { extenderURL string + preemptVerb string filterVerb string prioritizeVerb string bindVerb string @@ -93,6 +94,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx } return &HTTPExtender{ extenderURL: config.URLPrefix, + preemptVerb: config.PreemptVerb, filterVerb: config.FilterVerb, prioritizeVerb: config.PrioritizeVerb, bindVerb: config.BindVerb, @@ -103,10 +105,126 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx }, nil } +// SupportsPreemption returns if a extender support preemption. +// A extender should have preempt verb defined and enabled its own node cache. +func (h *HTTPExtender) SupportsPreemption() bool { + return len(h.preemptVerb) > 0 +} + +// ProcessPreemption returns filtered candidate nodes and victims after running preemption logic in extender. +func (h *HTTPExtender) ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + var ( + result schedulerapi.ExtenderPreemptionResult + args *schedulerapi.ExtenderPreemptionArgs + ) + + if !h.SupportsPreemption() { + return nil, fmt.Errorf("preempt verb is not defined for extender %v but run into ProcessPreemption", h.extenderURL) + } + + if h.nodeCacheCapable { + // If extender has cached node info, pass NodeNameToMetaVictims in args. + nodeNameToVictims := convertToNodeNameToMetaVictims(nodeToVictims) + args = &schedulerapi.ExtenderPreemptionArgs{ + Pod: pod, + NodeNameToMetaVictims: nodeNameToVictims, + } + } else { + args = &schedulerapi.ExtenderPreemptionArgs{ + Pod: pod, + NodeToVictims: nodeToVictims, + } + } + + if err := h.send(h.preemptVerb, args, &result); err != nil { + return nil, err + } + + // Extender will always return NodeNameToMetaVictims. + // So let's convert it to NodeToVictims by using NodeNameToInfo. + nodeToVictims, err := h.convertToNodeToVictims(result.NodeNameToMetaVictims, nodeNameToInfo) + if err != nil { + return nil, err + } + return nodeToVictims, nil + +} + +// convertToNodeToVictims converts from meta types to struct type. +func (h *HTTPExtender) convertToNodeToVictims( + nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + nodeToVictims := map[*v1.Node]*schedulerapi.Victims{} + for nodeName, metaVictims := range nodeNameToMetaVictims { + victims := &schedulerapi.Victims{ + Pods: []*v1.Pod{}, + } + for _, metaPod := range metaVictims.Pods { + pod, err := h.restorePodFromNodeInfo(metaPod, nodeName, nodeNameToInfo) + if err != nil { + return nil, err + } + victims.Pods = append(victims.Pods, pod) + } + nodeToVictims[nodeNameToInfo[nodeName].Node()] = victims + } + return nodeToVictims, nil +} + +// restorePodFromNodeInfo returns v1.Pod object for given MetaPod and node name. +// The v1.Pod object is restored by nodeInfo.Pods(). +// It should return error if there's cache inconsistency between default scheduler and extender +// so that this pod or node is missing from nodeNameToInfo. +func (h *HTTPExtender) restorePodFromNodeInfo( + metaPod *schedulerapi.MetaPod, + nodeName string, + nodeNameToInfo map[string]*schedulercache.NodeInfo) (*v1.Pod, error) { + if nodeInfo, ok := nodeNameToInfo[nodeName]; ok { + for _, pod := range nodeInfo.Pods() { + if string(pod.UID) == metaPod.UID { + return pod, nil + } + } + return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node.", + h.extenderURL, metaPod, nodeInfo.Node().Name) + } else { + return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map", + h.extenderURL, nodeInfo.Node().Name) + } +} + +// convertToNodeNameToMetaVictims converts from struct type to meta types. +func convertToNodeNameToMetaVictims( + nodeToVictims map[*v1.Node]*schedulerapi.Victims, +) map[string]*schedulerapi.MetaVictims { + nodeNameToVictims := map[string]*schedulerapi.MetaVictims{} + for node, victims := range nodeToVictims { + metaVictims := &schedulerapi.MetaVictims{ + Pods: []*schedulerapi.MetaPod{}, + } + for _, pod := range victims.Pods { + metaPod := &schedulerapi.MetaPod{ + UID: string(pod.UID), + } + metaVictims.Pods = append(metaVictims.Pods, metaPod) + } + nodeNameToVictims[node.GetName()] = metaVictims + } + return nodeNameToVictims +} + // Filter based on extender implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. -func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { +func (h *HTTPExtender) Filter( + pod *v1.Pod, + nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo, +) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { var ( result schedulerapi.ExtenderFilterResult nodeList *v1.NodeList @@ -133,7 +251,7 @@ func (h *HTTPExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[ } args = &schedulerapi.ExtenderArgs{ - Pod: *pod, + Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } @@ -193,7 +311,7 @@ func (h *HTTPExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (*schedulerapi. } args = &schedulerapi.ExtenderArgs{ - Pod: *pod, + Pod: pod, Nodes: nodeList, NodeNames: nodeNames, } diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 766ee2dec59..89bef4d8a25 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -22,12 +22,14 @@ import ( "time" "k8s.io/api/core/v1" + policy "k8s.io/api/policy/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/scheduler/algorithm" schedulerapi "k8s.io/kubernetes/pkg/scheduler/api" "k8s.io/kubernetes/pkg/scheduler/schedulercache" schedulertesting "k8s.io/kubernetes/pkg/scheduler/testing" + "k8s.io/kubernetes/pkg/scheduler/util" ) type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error) @@ -111,22 +113,140 @@ type FakeExtender struct { nodeCacheCapable bool filteredNodes []*v1.Node unInterested bool + + // Cached node information for fake extender + cachedNodeNameToInfo map[string]*schedulercache.NodeInfo + cachedPDBs []*policy.PodDisruptionBudget +} + +// selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side. +// Returns: +// 1. More victim pods (if any) amended by preemption phase of extender. +// 2. Number of violating victim (used to calculate PDB). +// 3. Fits or not after preemption phase on extender's side. +func (f *FakeExtender) selectVictimsOnNodeByExtender( + pod *v1.Pod, + node *v1.Node, +) ([]*v1.Pod, int, bool) { + // TODO(harry): add more test in generic_scheduler_test.go to verify this logic. + // If a extender support preemption but have no cached node info, let's run filter to make sure + // default scheduler's decision still stand with given pod and node. + if !f.nodeCacheCapable { + if fits, _ := f.runPredicate(pod, node); !fits { + return nil, 0, false + } + return []*v1.Pod{}, 0, true + } + + // Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available + // and get cached node info by given nodeName. + nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone() + + potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} + + removePod := func(rp *v1.Pod) { + nodeInfoCopy.RemovePod(rp) + } + addPod := func(ap *v1.Pod) { + nodeInfoCopy.AddPod(ap) + } + // As the first step, remove all the lower priority pods from the node and + // check if the given pod can be scheduled. + podPriority := util.GetPodPriority(pod) + for _, p := range nodeInfoCopy.Pods() { + if util.GetPodPriority(p) < podPriority { + potentialVictims.Items = append(potentialVictims.Items, p) + removePod(p) + } + } + potentialVictims.Sort() + + // If the new pod does not fit after removing all the lower priority pods, + // we are almost done and this node is not suitable for preemption. + if fits, _ := f.runPredicate(pod, nodeInfoCopy.Node()); !fits { + return nil, 0, false + } + + var victims []*v1.Pod + + // TODO(harry): handle PDBs in the future. + numViolatingVictim := 0 + + reprievePod := func(p *v1.Pod) bool { + addPod(p) + fits, _ := f.runPredicate(pod, nodeInfoCopy.Node()) + if !fits { + removePod(p) + victims = append(victims, p) + } + return fits + } + + // For now, assume all potential victims to be non-violating. + // Now we try to reprieve non-violating victims. + for _, p := range potentialVictims.Items { + reprievePod(p.(*v1.Pod)) + } + + return victims, numViolatingVictim, true +} + +func (f *FakeExtender) SupportsPreemption() bool { + // Assume preempt verb is always defined. + return true +} + +func (f *FakeExtender) ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{} + // We don't want to change the original nodeToVictims + for k, v := range nodeToVictims { + nodeToVictimsCopy[k] = v + } + + for node, victims := range nodeToVictimsCopy { + // Try to do preemption on extender side. + extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node) + // If it's unfit after extender's preemption, this node is unresolvable by preemption overall, + // let's remove it from potential preemption nodes. + if !fits { + delete(nodeToVictimsCopy, node) + } else { + // Append new victims to original victims + nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...) + nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations + } + } + return nodeToVictimsCopy, nil +} + +// runPredicate run predicates of extender one by one for given pod and node. +// Returns: fits or not. +func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) { + fits := true + var err error + for _, predicate := range f.predicates { + fits, err = predicate(pod, node) + if err != nil { + return false, err + } + if !fits { + break + } + } + return fits, nil } func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) { filtered := []*v1.Node{} failedNodesMap := schedulerapi.FailedNodesMap{} for _, node := range nodes { - fits := true - for _, predicate := range f.predicates { - fit, err := predicate(pod, node) - if err != nil { - return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err - } - if !fit { - fits = false - break - } + fits, err := f.runPredicate(pod, node) + if err != nil { + return []*v1.Node{}, schedulerapi.FailedNodesMap{}, err } if fits { filtered = append(filtered, node) @@ -340,7 +460,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { } cache := schedulercache.New(time.Duration(0), wait.NeverStop) for _, name := range test.nodes { - cache.AddNode(&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}}) + cache.AddNode(createNode(name)) } queue := NewSchedulingQueue() scheduler := NewGenericScheduler( @@ -362,3 +482,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) { } } } + +func createNode(name string) *v1.Node { + return &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: name}} +} diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index f7e9cddfd23..6f3a9a323c1 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -54,13 +54,6 @@ type FitError struct { FailedPredicates FailedPredicateMap } -// Victims describes pod victims. -type Victims struct { - pods []*v1.Pod - numPDBViolations int -} - -// ErrNoNodesAvailable defines an error of no nodes available. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") const ( @@ -234,34 +227,64 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, if err != nil { return nil, nil, nil, err } - nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, g.predicateMetaProducer, g.schedulingQueue, pdbs) + nodeToVictims, err := selectNodesForPreemption(pod, g.cachedNodeInfoMap, potentialNodes, g.predicates, + g.predicateMetaProducer, g.schedulingQueue, pdbs) if err != nil { return nil, nil, nil, err } - for len(nodeToVictims) > 0 { - node := pickOneNodeForPreemption(nodeToVictims) - if node == nil { - return nil, nil, nil, err - } - passes, pErr := nodePassesExtendersForPreemption(pod, node.Name, nodeToVictims[node].pods, g.cachedNodeInfoMap, g.extenders) - if passes && pErr == nil { - // Lower priority pods nominated to run on this node, may no longer fit on - // this node. So, we should remove their nomination. Removing their - // nomination updates these pods and moves them to the active queue. It - // lets scheduler find another place for them. - nominatedPods := g.getLowerPriorityNominatedPods(pod, node.Name) - return node, nodeToVictims[node].pods, nominatedPods, err - } - if pErr != nil { - glog.Errorf("Error occurred while checking extenders for preemption on node %v: %v", node, pErr) - } - // Remove the node from the map and try to pick a different node. - delete(nodeToVictims, node) + + // We will only check nodeToVictims with extenders that support preemption. + // Extenders which do not support preemption may later prevent preemptor from being scheduled on the nominated + // node. In that case, scheduler will find a different host for the preemptor in subsequent scheduling cycles. + nodeToVictims, err = g.processPreemptionWithExtenders(pod, nodeToVictims) + if err != nil { + return nil, nil, nil, err + } + + candidateNode := pickOneNodeForPreemption(nodeToVictims) + if candidateNode == nil { + return nil, nil, nil, err + } + + // Lower priority pods nominated to run on this node, may no longer fit on + // this node. So, we should remove their nomination. Removing their + // nomination updates these pods and moves them to the active queue. It + // lets scheduler find another place for them. + nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) + if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { + return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err + } else { + return nil, nil, nil, fmt.Errorf( + "preemption failed: the target node %s has been deleted from scheduler cache", + candidateNode.Name) } - return nil, nil, nil, err } -// GetLowerPriorityNominatedPods returns pods whose priority is smaller than the +// processPreemptionWithExtenders processes preemption with extenders +func (g *genericScheduler) processPreemptionWithExtenders( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, +) (map[*v1.Node]*schedulerapi.Victims, error) { + if len(nodeToVictims) > 0 { + for _, extender := range g.extenders { + if extender.SupportsPreemption() { + var err error + // Replace nodeToVictims with result after preemption from extender. + if nodeToVictims, err = extender.ProcessPreemption(pod, nodeToVictims, g.cachedNodeInfoMap); err != nil { + return nil, err + } + // If node list is empty, no preemption will happen, skip other extenders. + if len(nodeToVictims) == 0 { + break + } + } + } + } + + return nodeToVictims, nil +} + +// getLowerPriorityNominatedPods returns pods whose priority is smaller than the // priority of the given "pod" and are nominated to run on the given node. // Note: We could possibly check if the nominated lower priority pods still fit // and return those that no longer fit, but that would require lots of @@ -270,6 +293,7 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, // small number of nominated pods per node. func (g *genericScheduler) getLowerPriorityNominatedPods(pod *v1.Pod, nodeName string) []*v1.Pod { pods := g.schedulingQueue.WaitingPodsForNode(nodeName) + if len(pods) == 0 { return nil } @@ -683,7 +707,7 @@ func EqualPriorityMap(_ *v1.Pod, _ interface{}, nodeInfo *schedulercache.NodeInf // 5. If there are still ties, the first such node is picked (sort of randomly). // The 'minNodes1' and 'minNodes2' are being reused here to save the memory // allocation and garbage collection time. -func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { +func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*schedulerapi.Victims) *v1.Node { if len(nodesToVictims) == 0 { return nil } @@ -691,14 +715,14 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { var minNodes1 []*v1.Node lenNodes1 := 0 for node, victims := range nodesToVictims { - if len(victims.pods) == 0 { + if len(victims.Pods) == 0 { // We found a node that doesn't need any preemption. Return it! // This should happen rarely when one or more pods are terminated between // the time that scheduler tries to schedule the pod and the time that // preemption logic tries to find nodes for preemption. return node } - numPDBViolatingPods := victims.numPDBViolations + numPDBViolatingPods := victims.NumPDBViolations if numPDBViolatingPods < minNumPDBViolatingPods { minNumPDBViolatingPods = numPDBViolatingPods minNodes1 = nil @@ -722,7 +746,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { node := minNodes1[i] victims := nodesToVictims[node] // highestPodPriority is the highest priority among the victims on this node. - highestPodPriority := util.GetPodPriority(victims.pods[0]) + highestPodPriority := util.GetPodPriority(victims.Pods[0]) if highestPodPriority < minHighestPriority { minHighestPriority = highestPodPriority lenNodes2 = 0 @@ -743,7 +767,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { for i := 0; i < lenNodes2; i++ { var sumPriorities int64 node := minNodes2[i] - for _, pod := range nodesToVictims[node].pods { + for _, pod := range nodesToVictims[node].Pods { // We add MaxInt32+1 to all priorities to make all of them >= 0. This is // needed so that a node with a few pods with negative priority is not // picked over a node with a smaller number of pods with the same negative @@ -769,7 +793,7 @@ func pickOneNodeForPreemption(nodesToVictims map[*v1.Node]*Victims) *v1.Node { lenNodes2 = 0 for i := 0; i < lenNodes1; i++ { node := minNodes1[i] - numPods := len(nodesToVictims[node].pods) + numPods := len(nodesToVictims[node].Pods) if numPods < minNumPods { minNumPods = numPods lenNodes2 = 0 @@ -797,9 +821,9 @@ func selectNodesForPreemption(pod *v1.Pod, metadataProducer algorithm.PredicateMetadataProducer, queue SchedulingQueue, pdbs []*policy.PodDisruptionBudget, -) (map[*v1.Node]*Victims, error) { +) (map[*v1.Node]*schedulerapi.Victims, error) { - nodeNameToVictims := map[*v1.Node]*Victims{} + nodeToVictims := map[*v1.Node]*schedulerapi.Victims{} var resultLock sync.Mutex // We can use the same metadata producer for all nodes. @@ -813,50 +837,16 @@ func selectNodesForPreemption(pod *v1.Pod, pods, numPDBViolations, fits := selectVictimsOnNode(pod, metaCopy, nodeNameToInfo[nodeName], predicates, queue, pdbs) if fits { resultLock.Lock() - victims := Victims{ - pods: pods, - numPDBViolations: numPDBViolations, + victims := schedulerapi.Victims{ + Pods: pods, + NumPDBViolations: numPDBViolations, } - nodeNameToVictims[potentialNodes[i]] = &victims + nodeToVictims[potentialNodes[i]] = &victims resultLock.Unlock() } } workqueue.Parallelize(16, len(potentialNodes), checkNode) - return nodeNameToVictims, nil -} - -func nodePassesExtendersForPreemption( - pod *v1.Pod, - nodeName string, - victims []*v1.Pod, - nodeNameToInfo map[string]*schedulercache.NodeInfo, - extenders []algorithm.SchedulerExtender) (bool, error) { - // If there are any extenders, run them and filter the list of candidate nodes. - if len(extenders) == 0 { - return true, nil - } - // Remove the victims from the corresponding nodeInfo and send nodes to the - // extenders for filtering. - originalNodeInfo := nodeNameToInfo[nodeName] - nodeInfoCopy := nodeNameToInfo[nodeName].Clone() - for _, victim := range victims { - nodeInfoCopy.RemovePod(victim) - } - nodeNameToInfo[nodeName] = nodeInfoCopy - defer func() { nodeNameToInfo[nodeName] = originalNodeInfo }() - filteredNodes := []*v1.Node{nodeInfoCopy.Node()} - for _, extender := range extenders { - var err error - var failedNodesMap map[string]string - filteredNodes, failedNodesMap, err = extender.Filter(pod, filteredNodes, nodeNameToInfo) - if err != nil { - return false, err - } - if _, found := failedNodesMap[nodeName]; found || len(filteredNodes) == 0 { - return false, nil - } - } - return true, nil + return nodeToVictims, nil } // filterPodsWithPDBViolation groups the given "pods" into two groups of "violatingPods" @@ -996,6 +986,7 @@ func nodesWherePreemptionMightHelp(pod *v1.Pod, nodes []*v1.Node, failedPredicat // (which is the case today), the !found case should never happen, but we'd prefer // to rely less on such assumptions in the code when checking does not impose // significant overhead. + // Also, we currently assume all failures returned by extender as resolvable. for _, failedPredicate := range failedPredicates { switch failedPredicate { case diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index 9df65890f29..eaca9831f75 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -660,11 +660,11 @@ func TestZeroRequest(t *testing.T) { } } -func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string { +func printNodeToVictims(nodeToVictims map[*v1.Node]*schedulerapi.Victims) string { var output string for node, victims := range nodeToVictims { output += node.Name + ": [" - for _, pod := range victims.pods { + for _, pod := range victims.Pods { output += pod.Name + ", " } output += "]" @@ -672,15 +672,15 @@ func printNodeToVictims(nodeToVictims map[*v1.Node]*Victims) string { return output } -func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*Victims) error { +func checkPreemptionVictims(testName string, expected map[string]map[string]bool, nodeToPods map[*v1.Node]*schedulerapi.Victims) error { if len(expected) == len(nodeToPods) { for k, victims := range nodeToPods { if expPods, ok := expected[k.Name]; ok { - if len(victims.pods) != len(expPods) { + if len(victims.Pods) != len(expPods) { return fmt.Errorf("test [%v]: unexpected number of pods. expected: %v, got: %v", testName, expected, printNodeToVictims(nodeToPods)) } prevPriority := int32(math.MaxInt32) - for _, p := range victims.pods { + for _, p := range victims.Pods { // Check that pods are sorted by their priority. if *p.Spec.Priority > prevPriority { return fmt.Errorf("test [%v]: pod %v of node %v was not sorted by priority", testName, p.Name, k) @@ -1284,11 +1284,20 @@ func TestPreempt(t *testing.T) { for _, pod := range test.pods { cache.AddPod(pod) } + cachedNodeInfoMap := map[string]*schedulercache.NodeInfo{} for _, name := range nodeNames { - cache.AddNode(makeNode(name, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5)) + node := makeNode(name, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5) + cache.AddNode(node) + + // Set nodeInfo to extenders to mock extenders' cache for preemption. + cachedNodeInfo := schedulercache.NewNodeInfo() + cachedNodeInfo.SetNode(node) + cachedNodeInfoMap[name] = cachedNodeInfo } extenders := []algorithm.SchedulerExtender{} for _, extender := range test.extenders { + // Set nodeInfoMap as extenders cached node information. + extender.cachedNodeNameToInfo = cachedNodeInfoMap extenders = append(extenders, extender) } scheduler := NewGenericScheduler( diff --git a/test/integration/scheduler/extender_test.go b/test/integration/scheduler/extender_test.go index 38369f960d5..6bb0154c82a 100644 --- a/test/integration/scheduler/extender_test.go +++ b/test/integration/scheduler/extender_test.go @@ -130,7 +130,7 @@ func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*sched for _, nodeName := range *args.NodeNames { fits := true for _, predicate := range e.predicates { - fit, err := predicate(&args.Pod, + fit, err := predicate(args.Pod, &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}}) if err != nil { return &schedulerapi.ExtenderFilterResult{ @@ -169,7 +169,7 @@ func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.Extend for _, node := range args.Nodes.Items { fits := true for _, predicate := range e.predicates { - fit, err := predicate(&args.Pod, &node) + fit, err := predicate(args.Pod, &node) if err != nil { return &schedulerapi.ExtenderFilterResult{ Nodes: &v1.NodeList{}, @@ -217,7 +217,7 @@ func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.Ho continue } priorityFunc := prioritizer.function - prioritizedList, err := priorityFunc(&args.Pod, nodes) + prioritizedList, err := priorityFunc(args.Pod, nodes) if err != nil { return &schedulerapi.HostPriorityList{}, err } From b62d82422db8f5f52a51f1e339e52d7439b4522b Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Tue, 27 Feb 2018 11:06:24 -0800 Subject: [PATCH 2/5] Fix golints in extender --- pkg/scheduler/core/extender.go | 9 +++++---- pkg/scheduler/core/generic_scheduler.go | 9 +++++---- pkg/scheduler/core/generic_scheduler_test.go | 4 ++-- 3 files changed, 12 insertions(+), 10 deletions(-) diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index 14bd455cc03..f0071ececed 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -184,18 +184,19 @@ func (h *HTTPExtender) restorePodFromNodeInfo( metaPod *schedulerapi.MetaPod, nodeName string, nodeNameToInfo map[string]*schedulercache.NodeInfo) (*v1.Pod, error) { + var nodeInfo *schedulercache.NodeInfo if nodeInfo, ok := nodeNameToInfo[nodeName]; ok { for _, pod := range nodeInfo.Pods() { if string(pod.UID) == metaPod.UID { return pod, nil } } - return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node.", + return nil, fmt.Errorf("extender: %v claims to preempt pod (UID: %v) on node: %v, but the pod is not found on that node", h.extenderURL, metaPod, nodeInfo.Node().Name) - } else { - return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map", - h.extenderURL, nodeInfo.Node().Name) } + + return nil, fmt.Errorf("extender: %v claims to preempt on node: %v but the node is not found in nodeNameToInfo map", + h.extenderURL, nodeInfo.Node().Name) } // convertToNodeNameToMetaVictims converts from struct type to meta types. diff --git a/pkg/scheduler/core/generic_scheduler.go b/pkg/scheduler/core/generic_scheduler.go index 6f3a9a323c1..f86ce887ef2 100644 --- a/pkg/scheduler/core/generic_scheduler.go +++ b/pkg/scheduler/core/generic_scheduler.go @@ -54,6 +54,7 @@ type FitError struct { FailedPredicates FailedPredicateMap } +// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods. var ErrNoNodesAvailable = fmt.Errorf("no nodes available to schedule pods") const ( @@ -253,11 +254,11 @@ func (g *genericScheduler) Preempt(pod *v1.Pod, nodeLister algorithm.NodeLister, nominatedPods := g.getLowerPriorityNominatedPods(pod, candidateNode.Name) if nodeInfo, ok := g.cachedNodeInfoMap[candidateNode.Name]; ok { return nodeInfo.Node(), nodeToVictims[candidateNode].Pods, nominatedPods, err - } else { - return nil, nil, nil, fmt.Errorf( - "preemption failed: the target node %s has been deleted from scheduler cache", - candidateNode.Name) } + + return nil, nil, nil, fmt.Errorf( + "preemption failed: the target node %s has been deleted from scheduler cache", + candidateNode.Name) } // processPreemptionWithExtenders processes preemption with extenders diff --git a/pkg/scheduler/core/generic_scheduler_test.go b/pkg/scheduler/core/generic_scheduler_test.go index eaca9831f75..ffb08932280 100644 --- a/pkg/scheduler/core/generic_scheduler_test.go +++ b/pkg/scheduler/core/generic_scheduler_test.go @@ -883,7 +883,7 @@ func TestSelectNodesForPreemption(t *testing.T) { for _, test := range tests { nodes := []*v1.Node{} for _, n := range test.nodes { - node := makeNode(n, priorityutil.DefaultMilliCPURequest*5, priorityutil.DefaultMemoryRequest*5) + node := makeNode(n, 1000*5, priorityutil.DefaultMemoryRequest*5) node.ObjectMeta.Labels = map[string]string{"hostname": node.Name} nodes = append(nodes, node) } @@ -1286,7 +1286,7 @@ func TestPreempt(t *testing.T) { } cachedNodeInfoMap := map[string]*schedulercache.NodeInfo{} for _, name := range nodeNames { - node := makeNode(name, priorityutil.DefaultMilliCpuRequest*5, priorityutil.DefaultMemoryRequest*5) + node := makeNode(name, 1000*5, priorityutil.DefaultMemoryRequest*5) cache.AddNode(node) // Set nodeInfo to extenders to mock extenders' cache for preemption. From 009699fd22fdcc016a35ff8fa451a1c4b468c7bb Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Mon, 26 Feb 2018 23:10:52 -0800 Subject: [PATCH 3/5] Update generated types --- pkg/scheduler/api/v1/zz_generated.deepcopy.go | 51 ++++++++----------- pkg/scheduler/api/zz_generated.deepcopy.go | 51 ++++++++----------- 2 files changed, 44 insertions(+), 58 deletions(-) diff --git a/pkg/scheduler/api/v1/zz_generated.deepcopy.go b/pkg/scheduler/api/v1/zz_generated.deepcopy.go index 77efc9a175a..026726ab4f4 100644 --- a/pkg/scheduler/api/v1/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/v1/zz_generated.deepcopy.go @@ -196,28 +196,6 @@ func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) { - { - in := &in - *out = make(FailedNodesMap, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - return - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailedNodesMap. -func (in FailedNodesMap) DeepCopy() FailedNodesMap { - if in == nil { - return nil - } - out := new(FailedNodesMap) - in.DeepCopyInto(out) - return *out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { *out = *in @@ -277,13 +255,6 @@ func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) } } } - if in.NodeToVictims != nil { - in, out := &in.NodeToVictims, &out.NodeToVictims - *out = make(map[*core_v1.Node]*Victims, len(*in)) - for range *in { - // FIXME: Copying unassignable keys unsupported *core_v1.Node - } - } return } @@ -297,6 +268,28 @@ func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) { + { + in := &in + *out = make(FailedNodesMap, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailedNodesMap. +func (in FailedNodesMap) DeepCopy() FailedNodesMap { + if in == nil { + return nil + } + out := new(FailedNodesMap) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HostPriority) DeepCopyInto(out *HostPriority) { *out = *in diff --git a/pkg/scheduler/api/zz_generated.deepcopy.go b/pkg/scheduler/api/zz_generated.deepcopy.go index b765a201d48..24bd25d5ef1 100644 --- a/pkg/scheduler/api/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/zz_generated.deepcopy.go @@ -196,28 +196,6 @@ func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource { return out } -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) { - { - in := &in - *out = make(FailedNodesMap, len(*in)) - for key, val := range *in { - (*out)[key] = val - } - return - } -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailedNodesMap. -func (in FailedNodesMap) DeepCopy() FailedNodesMap { - if in == nil { - return nil - } - out := new(FailedNodesMap) - in.DeepCopyInto(out) - return *out -} - // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { *out = *in @@ -277,13 +255,6 @@ func (in *ExtenderPreemptionResult) DeepCopyInto(out *ExtenderPreemptionResult) } } } - if in.NodeToVictims != nil { - in, out := &in.NodeToVictims, &out.NodeToVictims - *out = make(map[*v1.Node]*Victims, len(*in)) - for range *in { - // FIXME: Copying unassignable keys unsupported *v1.Node - } - } return } @@ -297,6 +268,28 @@ func (in *ExtenderPreemptionResult) DeepCopy() *ExtenderPreemptionResult { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) { + { + in := &in + *out = make(FailedNodesMap, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + return + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new FailedNodesMap. +func (in FailedNodesMap) DeepCopy() FailedNodesMap { + if in == nil { + return nil + } + out := new(FailedNodesMap) + in.DeepCopyInto(out) + return *out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *HostPriority) DeepCopyInto(out *HostPriority) { *out = *in From ea5f0b1de2b90523e3a9bd67acb994c9f8553ffd Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Tue, 27 Feb 2018 14:47:00 -0800 Subject: [PATCH 4/5] Remove example change to seperate repo --- examples/scheduler-policy-config-with-extender.json | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/scheduler-policy-config-with-extender.json b/examples/scheduler-policy-config-with-extender.json index fb5e1381353..cc1a9bd6fbd 100644 --- a/examples/scheduler-policy-config-with-extender.json +++ b/examples/scheduler-policy-config-with-extender.json @@ -20,7 +20,6 @@ "filterVerb": "filter", "bindVerb": "bind", "prioritizeVerb": "prioritize", - "preemptVerb": "preempt", "weight": 5, "enableHttps": false, "nodeCacheCapable": false From 7a7f9dccd04dc7fc86983b7644ca6c857fd57d49 Mon Sep 17 00:00:00 2001 From: Harry Zhang Date: Fri, 2 Mar 2018 00:34:55 -0800 Subject: [PATCH 5/5] [PATCH] Use nodename as key --- pkg/scheduler/api/types.go | 4 +- pkg/scheduler/api/v1/types.go | 6 +- pkg/scheduler/api/v1/zz_generated.deepcopy.go | 15 ++-- pkg/scheduler/api/zz_generated.deepcopy.go | 15 ++-- pkg/scheduler/core/extender.go | 30 +++++--- pkg/scheduler/core/extender_test.go | 71 ++++++++++--------- pkg/scheduler/factory/factory_test.go | 23 +++++- 7 files changed, 105 insertions(+), 59 deletions(-) diff --git a/pkg/scheduler/api/types.go b/pkg/scheduler/api/types.go index de985d9c822..242d74e1eef 100644 --- a/pkg/scheduler/api/types.go +++ b/pkg/scheduler/api/types.go @@ -212,8 +212,8 @@ type ExtenderPreemptionArgs struct { // Pod being scheduled Pod *v1.Pod // Victims map generated by scheduler preemption phase - // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims. - NodeToVictims map[*v1.Node]*Victims + // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims. + NodeNameToVictims map[string]*Victims NodeNameToMetaVictims map[string]*MetaVictims } diff --git a/pkg/scheduler/api/v1/types.go b/pkg/scheduler/api/v1/types.go index e1722cc9caf..80ad839a039 100644 --- a/pkg/scheduler/api/v1/types.go +++ b/pkg/scheduler/api/v1/types.go @@ -199,9 +199,9 @@ type ExtenderPreemptionArgs struct { // Pod being scheduled Pod *apiv1.Pod `json:"pod"` // Victims map generated by scheduler preemption phase - // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeToVictims. - NodeToVictims map[*apiv1.Node]*Victims `json:"nodeToVictims,omitempty"` - NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"` + // Only set NodeNameToMetaVictims if ExtenderConfig.NodeCacheCapable == true. Otherwise, only set NodeNameToVictims. + NodeNameToVictims map[string]*Victims `json:"nodeToVictims,omitempty"` + NodeNameToMetaVictims map[string]*MetaVictims `json:"nodeNameToMetaVictims,omitempty"` } // Victims represents: diff --git a/pkg/scheduler/api/v1/zz_generated.deepcopy.go b/pkg/scheduler/api/v1/zz_generated.deepcopy.go index 026726ab4f4..155d3983b9b 100644 --- a/pkg/scheduler/api/v1/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/v1/zz_generated.deepcopy.go @@ -208,11 +208,16 @@ func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { (*in).DeepCopyInto(*out) } } - if in.NodeToVictims != nil { - in, out := &in.NodeToVictims, &out.NodeToVictims - *out = make(map[*core_v1.Node]*Victims, len(*in)) - for range *in { - // FIXME: Copying unassignable keys unsupported *core_v1.Node + if in.NodeNameToVictims != nil { + in, out := &in.NodeNameToVictims, &out.NodeNameToVictims + *out = make(map[string]*Victims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(Victims) + val.DeepCopyInto((*out)[key]) + } } } if in.NodeNameToMetaVictims != nil { diff --git a/pkg/scheduler/api/zz_generated.deepcopy.go b/pkg/scheduler/api/zz_generated.deepcopy.go index 24bd25d5ef1..6299831dbc7 100644 --- a/pkg/scheduler/api/zz_generated.deepcopy.go +++ b/pkg/scheduler/api/zz_generated.deepcopy.go @@ -208,11 +208,16 @@ func (in *ExtenderPreemptionArgs) DeepCopyInto(out *ExtenderPreemptionArgs) { (*in).DeepCopyInto(*out) } } - if in.NodeToVictims != nil { - in, out := &in.NodeToVictims, &out.NodeToVictims - *out = make(map[*v1.Node]*Victims, len(*in)) - for range *in { - // FIXME: Copying unassignable keys unsupported *v1.Node + if in.NodeNameToVictims != nil { + in, out := &in.NodeNameToVictims, &out.NodeNameToVictims + *out = make(map[string]*Victims, len(*in)) + for key, val := range *in { + if val == nil { + (*out)[key] = nil + } else { + (*out)[key] = new(Victims) + val.DeepCopyInto((*out)[key]) + } } } if in.NodeNameToMetaVictims != nil { diff --git a/pkg/scheduler/core/extender.go b/pkg/scheduler/core/extender.go index f0071ececed..d7b5efc7cf5 100644 --- a/pkg/scheduler/core/extender.go +++ b/pkg/scheduler/core/extender.go @@ -128,15 +128,16 @@ func (h *HTTPExtender) ProcessPreemption( if h.nodeCacheCapable { // If extender has cached node info, pass NodeNameToMetaVictims in args. - nodeNameToVictims := convertToNodeNameToMetaVictims(nodeToVictims) + nodeNameToMetaVictims := convertToNodeNameToMetaVictims(nodeToVictims) args = &schedulerapi.ExtenderPreemptionArgs{ Pod: pod, - NodeNameToMetaVictims: nodeNameToVictims, + NodeNameToMetaVictims: nodeNameToMetaVictims, } } else { + nodeNameToVictims := convertToNodeNameToVictims(nodeToVictims) args = &schedulerapi.ExtenderPreemptionArgs{ - Pod: pod, - NodeToVictims: nodeToVictims, + Pod: pod, + NodeNameToVictims: nodeNameToVictims, } } @@ -151,10 +152,10 @@ func (h *HTTPExtender) ProcessPreemption( return nil, err } return nodeToVictims, nil - } -// convertToNodeToVictims converts from meta types to struct type. +// convertToNodeToVictims converts "nodeNameToMetaVictims" from object identifiers, +// such as UIDs and names, to object pointers. func (h *HTTPExtender) convertToNodeToVictims( nodeNameToMetaVictims map[string]*schedulerapi.MetaVictims, nodeNameToInfo map[string]*schedulercache.NodeInfo, @@ -165,7 +166,7 @@ func (h *HTTPExtender) convertToNodeToVictims( Pods: []*v1.Pod{}, } for _, metaPod := range metaVictims.Pods { - pod, err := h.restorePodFromNodeInfo(metaPod, nodeName, nodeNameToInfo) + pod, err := h.convertPodUIDToPod(metaPod, nodeName, nodeNameToInfo) if err != nil { return nil, err } @@ -176,11 +177,11 @@ func (h *HTTPExtender) convertToNodeToVictims( return nodeToVictims, nil } -// restorePodFromNodeInfo returns v1.Pod object for given MetaPod and node name. +// convertPodUIDToPod returns v1.Pod object for given MetaPod and node name. // The v1.Pod object is restored by nodeInfo.Pods(). // It should return error if there's cache inconsistency between default scheduler and extender // so that this pod or node is missing from nodeNameToInfo. -func (h *HTTPExtender) restorePodFromNodeInfo( +func (h *HTTPExtender) convertPodUIDToPod( metaPod *schedulerapi.MetaPod, nodeName string, nodeNameToInfo map[string]*schedulercache.NodeInfo) (*v1.Pod, error) { @@ -219,6 +220,17 @@ func convertToNodeNameToMetaVictims( return nodeNameToVictims } +// convertToNodeNameToVictims converts from node type to node name as key. +func convertToNodeNameToVictims( + nodeToVictims map[*v1.Node]*schedulerapi.Victims, +) map[string]*schedulerapi.Victims { + nodeNameToVictims := map[string]*schedulerapi.Victims{} + for node, victims := range nodeToVictims { + nodeNameToVictims[node.GetName()] = victims + } + return nodeNameToVictims +} + // Filter based on extender implemented predicate functions. The filtered list is // expected to be a subset of the supplied list. failedNodesMap optionally contains // the list of failed nodes and failure reasons. diff --git a/pkg/scheduler/core/extender_test.go b/pkg/scheduler/core/extender_test.go index 89bef4d8a25..6064ba04e3c 100644 --- a/pkg/scheduler/core/extender_test.go +++ b/pkg/scheduler/core/extender_test.go @@ -119,6 +119,42 @@ type FakeExtender struct { cachedPDBs []*policy.PodDisruptionBudget } +func (f *FakeExtender) SupportsPreemption() bool { + // Assume preempt verb is always defined. + return true +} + +func (f *FakeExtender) ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{} + // We don't want to change the original nodeToVictims + for k, v := range nodeToVictims { + // In real world implementation, extender's user should have his own way to get node object + // by name if needed (e.g. query kube-apiserver etc). + // + // For test purpose, we just use node from parameters directly. + nodeToVictimsCopy[k] = v + } + + for node, victims := range nodeToVictimsCopy { + // Try to do preemption on extender side. + extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node, nodeNameToInfo) + // If it's unfit after extender's preemption, this node is unresolvable by preemption overall, + // let's remove it from potential preemption nodes. + if !fits { + delete(nodeToVictimsCopy, node) + } else { + // Append new victims to original victims + nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...) + nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations + } + } + return nodeToVictimsCopy, nil +} + // selectVictimsOnNodeByExtender checks the given nodes->pods map with predicates on extender's side. // Returns: // 1. More victim pods (if any) amended by preemption phase of extender. @@ -127,6 +163,7 @@ type FakeExtender struct { func (f *FakeExtender) selectVictimsOnNodeByExtender( pod *v1.Pod, node *v1.Node, + nodeNameToInfo map[string]*schedulercache.NodeInfo, ) ([]*v1.Pod, int, bool) { // TODO(harry): add more test in generic_scheduler_test.go to verify this logic. // If a extender support preemption but have no cached node info, let's run filter to make sure @@ -139,7 +176,7 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender( } // Otherwise, as a extender support preemption and have cached node info, we will assume cachedNodeNameToInfo is available - // and get cached node info by given nodeName. + // and get cached node info by given node name. nodeInfoCopy := f.cachedNodeNameToInfo[node.GetName()].Clone() potentialVictims := util.SortableList{CompFunc: util.HigherPriorityPod} @@ -191,38 +228,6 @@ func (f *FakeExtender) selectVictimsOnNodeByExtender( return victims, numViolatingVictim, true } -func (f *FakeExtender) SupportsPreemption() bool { - // Assume preempt verb is always defined. - return true -} - -func (f *FakeExtender) ProcessPreemption( - pod *v1.Pod, - nodeToVictims map[*v1.Node]*schedulerapi.Victims, - nodeNameToInfo map[string]*schedulercache.NodeInfo, -) (map[*v1.Node]*schedulerapi.Victims, error) { - nodeToVictimsCopy := map[*v1.Node]*schedulerapi.Victims{} - // We don't want to change the original nodeToVictims - for k, v := range nodeToVictims { - nodeToVictimsCopy[k] = v - } - - for node, victims := range nodeToVictimsCopy { - // Try to do preemption on extender side. - extenderVictimPods, extendernPDBViolations, fits := f.selectVictimsOnNodeByExtender(pod, node) - // If it's unfit after extender's preemption, this node is unresolvable by preemption overall, - // let's remove it from potential preemption nodes. - if !fits { - delete(nodeToVictimsCopy, node) - } else { - // Append new victims to original victims - nodeToVictimsCopy[node].Pods = append(victims.Pods, extenderVictimPods...) - nodeToVictimsCopy[node].NumPDBViolations = victims.NumPDBViolations + extendernPDBViolations - } - } - return nodeToVictimsCopy, nil -} - // runPredicate run predicates of extender one by one for given pod and node. // Returns: fits or not. func (f *FakeExtender) runPredicate(pod *v1.Pod, node *v1.Node) (bool, error) { diff --git a/pkg/scheduler/factory/factory_test.go b/pkg/scheduler/factory/factory_test.go index 1ddce5808f3..924e48bb4d3 100644 --- a/pkg/scheduler/factory/factory_test.go +++ b/pkg/scheduler/factory/factory_test.go @@ -541,11 +541,30 @@ type fakeExtender struct { interestedPodName string } -func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) { +func (f *fakeExtender) ProcessPreemption( + pod *v1.Pod, + nodeToVictims map[*v1.Node]*schedulerapi.Victims, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (map[*v1.Node]*schedulerapi.Victims, error) { + return nil, nil +} + +func (f *fakeExtender) SupportsPreemption() bool { + return false +} + +func (f *fakeExtender) Filter( + pod *v1.Pod, + nodes []*v1.Node, + nodeNameToInfo map[string]*schedulercache.NodeInfo, +) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) { return nil, nil, nil } -func (f *fakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) { +func (f *fakeExtender) Prioritize( + pod *v1.Pod, + nodes []*v1.Node, +) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) { return nil, 0, nil }