mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-24 20:24:09 +00:00
Merge pull request #60332 from yguo0905/sched
Automatic merge from submit-queue (batch tested with PRs 60236, 60332, 57375, 60451, 57408). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. kube-scheduler: Support extender managed extended resources in kube-scheduler **What this PR does / why we need it**: This is the continuation of https://github.com/kubernetes/kubernetes/pull/58851. - This PR adds new extender configurations in scheduler policy config. - A set of extended resource names can be specified in an extender config. They are the resources that are managed by the extender. The scheduler will only send pods to the extender if the pod requests at least one of the extended resources in the set. - An `IgnoredByScheduler` flag can be set along with each of such resources. If this flag is set to true, the scheduler will not check the resource in the `PodFitsResources` predicate. - This PR also changes the default behavior of the `PodFitsResources` predicate. Now, by default, `PodFitsResources` will ignore the extended resources that are not in node status. This is required to support extender managed extended resources (including cluster-level resources) on node. Note that in kube-scheduler we override the default behavior by not ignoring such missing extended resources. **Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*: Fixes https://github.com/kubernetes/kubernetes/issues/53616 https://github.com/kubernetes/kubernetes/issues/58850 **Special notes for your reviewer**: **Release note**: ``` Support extender managed extended resources in kube-scheduler ```
This commit is contained in:
commit
1aee9fd9ff
@ -606,8 +606,16 @@ func (kl *Kubelet) setNodeStatusMachineInfo(node *v1.Node) {
|
||||
}
|
||||
|
||||
for _, removedResource := range removedDevicePlugins {
|
||||
glog.V(2).Infof("Remove capacity for %s", removedResource)
|
||||
delete(node.Status.Capacity, v1.ResourceName(removedResource))
|
||||
glog.V(2).Infof("Set capacity for %s to 0 on device removal", removedResource)
|
||||
// Set the capacity of the removed resource to 0 instead of
|
||||
// removing the resource from the node status. This is to indicate
|
||||
// that the resource is managed by device plugin and had been
|
||||
// registered before.
|
||||
//
|
||||
// This is required to differentiate the device plugin managed
|
||||
// resources and the cluster-level resources, which are absent in
|
||||
// node status.
|
||||
node.Status.Capacity[v1.ResourceName(removedResource)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -583,8 +583,10 @@ func TestHandlePluginResources(t *testing.T) {
|
||||
kl := testKubelet.kubelet
|
||||
|
||||
adjustedResource := v1.ResourceName("domain1.com/adjustedResource")
|
||||
unadjustedResouce := v1.ResourceName("domain2.com/unadjustedResouce")
|
||||
emptyResource := v1.ResourceName("domain2.com/emptyResource")
|
||||
missingResource := v1.ResourceName("domain2.com/missingResource")
|
||||
failedResource := v1.ResourceName("domain2.com/failedResource")
|
||||
resourceQuantity0 := *resource.NewQuantity(int64(0), resource.DecimalSI)
|
||||
resourceQuantity1 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||
resourceQuantity2 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
resourceQuantityInvalid := *resource.NewQuantity(int64(-1), resource.DecimalSI)
|
||||
@ -592,9 +594,9 @@ func TestHandlePluginResources(t *testing.T) {
|
||||
nodes := []*v1.Node{
|
||||
{ObjectMeta: metav1.ObjectMeta{Name: testKubeletHostname},
|
||||
Status: v1.NodeStatus{Capacity: v1.ResourceList{}, Allocatable: v1.ResourceList{
|
||||
adjustedResource: resourceQuantity1,
|
||||
unadjustedResouce: resourceQuantity1,
|
||||
v1.ResourcePods: allowedPodQuantity,
|
||||
adjustedResource: resourceQuantity1,
|
||||
emptyResource: resourceQuantity0,
|
||||
v1.ResourcePods: allowedPodQuantity,
|
||||
}}},
|
||||
}
|
||||
kl.nodeInfo = testNodeInfo{nodes: nodes}
|
||||
@ -607,6 +609,7 @@ func TestHandlePluginResources(t *testing.T) {
|
||||
// quantity unchanged.
|
||||
updateResourceMap := map[v1.ResourceName]resource.Quantity{
|
||||
adjustedResource: resourceQuantity2,
|
||||
emptyResource: resourceQuantity0,
|
||||
failedResource: resourceQuantityInvalid,
|
||||
}
|
||||
pod := attrs.Pod
|
||||
@ -634,7 +637,7 @@ func TestHandlePluginResources(t *testing.T) {
|
||||
|
||||
// pod requiring adjustedResource can be successfully allocated because updatePluginResourcesFunc
|
||||
// adjusts node.allocatableResource for this resource to a sufficient value.
|
||||
fittingPodspec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||
fittingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
adjustedResource: resourceQuantity2,
|
||||
@ -644,14 +647,30 @@ func TestHandlePluginResources(t *testing.T) {
|
||||
},
|
||||
}}},
|
||||
}
|
||||
// pod requiring unadjustedResouce with insufficient quantity will still fail PredicateAdmit.
|
||||
exceededPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||
// pod requiring emptyResource (extended resources with 0 allocatable) will
|
||||
// not pass PredicateAdmit.
|
||||
emptyPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
unadjustedResouce: resourceQuantity2,
|
||||
emptyResource: resourceQuantity2,
|
||||
},
|
||||
Requests: v1.ResourceList{
|
||||
unadjustedResouce: resourceQuantity2,
|
||||
emptyResource: resourceQuantity2,
|
||||
},
|
||||
}}},
|
||||
}
|
||||
// pod requiring missingResource will pass PredicateAdmit.
|
||||
//
|
||||
// Extended resources missing in node status are ignored in PredicateAdmit.
|
||||
// This is required to support extended resources that are not managed by
|
||||
// device plugin, such as cluster-level resources.
|
||||
missingPodSpec := v1.PodSpec{NodeName: string(kl.nodeName),
|
||||
Containers: []v1.Container{{Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
missingResource: resourceQuantity2,
|
||||
},
|
||||
Requests: v1.ResourceList{
|
||||
missingResource: resourceQuantity2,
|
||||
},
|
||||
}}},
|
||||
}
|
||||
@ -666,21 +685,18 @@ func TestHandlePluginResources(t *testing.T) {
|
||||
},
|
||||
}}},
|
||||
}
|
||||
pods := []*v1.Pod{
|
||||
podWithUIDNameNsSpec("123", "fittingpod", "foo", fittingPodspec),
|
||||
podWithUIDNameNsSpec("456", "exceededpod", "foo", exceededPodSpec),
|
||||
podWithUIDNameNsSpec("789", "failedpod", "foo", failedPodSpec),
|
||||
}
|
||||
// The latter two pod should be rejected.
|
||||
fittingPod := pods[0]
|
||||
exceededPod := pods[1]
|
||||
failedPod := pods[2]
|
||||
|
||||
kl.HandlePodAdditions(pods)
|
||||
fittingPod := podWithUIDNameNsSpec("1", "fittingpod", "foo", fittingPodSpec)
|
||||
emptyPod := podWithUIDNameNsSpec("2", "emptypod", "foo", emptyPodSpec)
|
||||
missingPod := podWithUIDNameNsSpec("3", "missingpod", "foo", missingPodSpec)
|
||||
failedPod := podWithUIDNameNsSpec("4", "failedpod", "foo", failedPodSpec)
|
||||
|
||||
kl.HandlePodAdditions([]*v1.Pod{fittingPod, emptyPod, missingPod, failedPod})
|
||||
|
||||
// Check pod status stored in the status map.
|
||||
checkPodStatus(t, kl, fittingPod, v1.PodPending)
|
||||
checkPodStatus(t, kl, exceededPod, v1.PodFailed)
|
||||
checkPodStatus(t, kl, emptyPod, v1.PodFailed)
|
||||
checkPodStatus(t, kl, missingPod, v1.PodPending)
|
||||
checkPodStatus(t, kl, failedPod, v1.PodFailed)
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ go_library(
|
||||
],
|
||||
importpath = "k8s.io/kubernetes/pkg/kubelet/lifecycle",
|
||||
deps = [
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/kubelet/util/format:go_default_library",
|
||||
@ -34,12 +35,17 @@ go_library(
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["handlers_test.go"],
|
||||
srcs = [
|
||||
"handlers_test.go",
|
||||
"predicate_test.go",
|
||||
],
|
||||
embed = [":go_default_library"],
|
||||
deps = [
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/util/format:go_default_library",
|
||||
"//pkg/scheduler/schedulercache:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/intstr:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -20,7 +20,9 @@ import (
|
||||
"fmt"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm/predicates"
|
||||
@ -77,7 +79,18 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
|
||||
Message: message,
|
||||
}
|
||||
}
|
||||
fit, reasons, err := predicates.GeneralPredicates(pod, nil, nodeInfo)
|
||||
|
||||
// Remove the requests of the extended resources that are missing in the
|
||||
// node info. This is required to support cluster-level resources, which
|
||||
// are extended resources unknown to nodes.
|
||||
//
|
||||
// Caveat: If a pod was manually bound to a node (e.g., static pod) where a
|
||||
// node-level extended resource it requires is not found, then kubelet will
|
||||
// not fail admission while it should. This issue will be addressed with
|
||||
// the Resource Class API in the future.
|
||||
podWithoutMissingExtendedResources := removeMissingExtendedResources(pod, nodeInfo)
|
||||
|
||||
fit, reasons, err := predicates.GeneralPredicates(podWithoutMissingExtendedResources, nil, nodeInfo)
|
||||
if err != nil {
|
||||
message := fmt.Sprintf("GeneralPredicates failed due to %v, which is unexpected.", err)
|
||||
glog.Warningf("Failed to admit pod %v - %s", format.Pod(pod), message)
|
||||
@ -141,3 +154,22 @@ func (w *predicateAdmitHandler) Admit(attrs *PodAdmitAttributes) PodAdmitResult
|
||||
Admit: true,
|
||||
}
|
||||
}
|
||||
|
||||
func removeMissingExtendedResources(pod *v1.Pod, nodeInfo *schedulercache.NodeInfo) *v1.Pod {
|
||||
podCopy := pod.DeepCopy()
|
||||
for i, c := range pod.Spec.Containers {
|
||||
// We only handle requests in Requests but not Limits because the
|
||||
// PodFitsResources predicate, to which the result pod will be passed,
|
||||
// does not use Limits.
|
||||
podCopy.Spec.Containers[i].Resources.Requests = make(v1.ResourceList)
|
||||
for rName, rQuant := range c.Resources.Requests {
|
||||
if v1helper.IsExtendedResourceName(rName) {
|
||||
if _, found := nodeInfo.AllocatableResource().ScalarResources[rName]; !found {
|
||||
continue
|
||||
}
|
||||
}
|
||||
podCopy.Spec.Containers[i].Resources.Requests[rName] = rQuant
|
||||
}
|
||||
}
|
||||
return podCopy
|
||||
}
|
||||
|
114
pkg/kubelet/lifecycle/predicate_test.go
Normal file
114
pkg/kubelet/lifecycle/predicate_test.go
Normal file
@ -0,0 +1,114 @@
|
||||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package lifecycle
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
var (
|
||||
quantity = *resource.NewQuantity(1, resource.DecimalSI)
|
||||
extendedResourceName1 = "example.com/er1"
|
||||
extendedResourceName2 = "example.com/er2"
|
||||
)
|
||||
|
||||
func TestRemoveMissingExtendedResources(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
desc string
|
||||
pod *v1.Pod
|
||||
node *v1.Node
|
||||
|
||||
expectedPod *v1.Pod
|
||||
}{
|
||||
{
|
||||
desc: "requests in Limits should be ignored",
|
||||
pod: makeTestPod(
|
||||
v1.ResourceList{}, // Requests
|
||||
v1.ResourceList{"foo.com/bar": quantity}, // Limits
|
||||
),
|
||||
node: makeTestNode(
|
||||
v1.ResourceList{"foo.com/baz": quantity}, // Allocatable
|
||||
),
|
||||
expectedPod: makeTestPod(
|
||||
v1.ResourceList{}, // Requests
|
||||
v1.ResourceList{"foo.com/bar": quantity}, // Limits
|
||||
),
|
||||
},
|
||||
{
|
||||
desc: "requests for resources available in node should not be removed",
|
||||
pod: makeTestPod(
|
||||
v1.ResourceList{"foo.com/bar": quantity}, // Requests
|
||||
v1.ResourceList{}, // Limits
|
||||
),
|
||||
node: makeTestNode(
|
||||
v1.ResourceList{"foo.com/bar": quantity}, // Allocatable
|
||||
),
|
||||
expectedPod: makeTestPod(
|
||||
v1.ResourceList{"foo.com/bar": quantity}, // Requests
|
||||
v1.ResourceList{}), // Limits
|
||||
},
|
||||
{
|
||||
desc: "requests for resources unavailable in node should be removed",
|
||||
pod: makeTestPod(
|
||||
v1.ResourceList{"foo.com/bar": quantity}, // Requests
|
||||
v1.ResourceList{}, // Limits
|
||||
),
|
||||
node: makeTestNode(
|
||||
v1.ResourceList{"foo.com/baz": quantity}, // Allocatable
|
||||
),
|
||||
expectedPod: makeTestPod(
|
||||
v1.ResourceList{}, // Requests
|
||||
v1.ResourceList{}, // Limits
|
||||
),
|
||||
},
|
||||
} {
|
||||
nodeInfo := schedulercache.NewNodeInfo()
|
||||
nodeInfo.SetNode(test.node)
|
||||
pod := removeMissingExtendedResources(test.pod, nodeInfo)
|
||||
if !reflect.DeepEqual(pod, test.expectedPod) {
|
||||
t.Errorf("%s: Expected pod\n%v\ngot\n%v\n", test.desc, test.expectedPod, pod)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestPod(requests, limits v1.ResourceList) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Resources: v1.ResourceRequirements{
|
||||
Requests: requests,
|
||||
Limits: limits,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestNode(allocatable v1.ResourceList) *v1.Node {
|
||||
return &v1.Node{
|
||||
Status: v1.NodeStatus{
|
||||
Allocatable: allocatable,
|
||||
},
|
||||
}
|
||||
}
|
@ -61,6 +61,7 @@ go_test(
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
],
|
||||
)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
"k8s.io/kubernetes/pkg/scheduler/schedulercache"
|
||||
schedutil "k8s.io/kubernetes/pkg/scheduler/util"
|
||||
@ -53,6 +54,13 @@ type predicateMetadata struct {
|
||||
serviceAffinityInUse bool
|
||||
serviceAffinityMatchingPodList []*v1.Pod
|
||||
serviceAffinityMatchingPodServices []*v1.Service
|
||||
// ignoredExtendedResources is a set of extended resource names that will
|
||||
// be ignored in the PodFitsResources predicate.
|
||||
//
|
||||
// They can be scheduler extender managed resources, the consumption of
|
||||
// which should be accounted only by the extenders. This set is synthesized
|
||||
// from scheduler extender configuration and does not change per pod.
|
||||
ignoredExtendedResources sets.String
|
||||
}
|
||||
|
||||
// Ensure that predicateMetadata implements algorithm.PredicateMetadata.
|
||||
@ -71,6 +79,17 @@ func RegisterPredicateMetadataProducer(predicateName string, precomp PredicateMe
|
||||
predicateMetadataProducers[predicateName] = precomp
|
||||
}
|
||||
|
||||
// RegisterPredicateMetadataProducerWithExtendedResourceOptions registers a
|
||||
// PredicateMetadataProducer that creates predicate metadata with the provided
|
||||
// options for extended resources.
|
||||
//
|
||||
// See the comments in "predicateMetadata" for the explanation of the options.
|
||||
func RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources sets.String) {
|
||||
RegisterPredicateMetadataProducer("PredicateWithExtendedResourceOptions", func(pm *predicateMetadata) {
|
||||
pm.ignoredExtendedResources = ignoredExtendedResources
|
||||
})
|
||||
}
|
||||
|
||||
// NewPredicateMetadataFactory creates a PredicateMetadataFactory.
|
||||
func NewPredicateMetadataFactory(podLister algorithm.PodLister) algorithm.PredicateMetadataProducer {
|
||||
factory := &PredicateMetadataFactory{
|
||||
@ -170,10 +189,11 @@ func (meta *predicateMetadata) AddPod(addedPod *v1.Pod, nodeInfo *schedulercache
|
||||
// its maps and slices, but it does not copy the contents of pointer values.
|
||||
func (meta *predicateMetadata) ShallowCopy() algorithm.PredicateMetadata {
|
||||
newPredMeta := &predicateMetadata{
|
||||
pod: meta.pod,
|
||||
podBestEffort: meta.podBestEffort,
|
||||
podRequest: meta.podRequest,
|
||||
serviceAffinityInUse: meta.serviceAffinityInUse,
|
||||
pod: meta.pod,
|
||||
podBestEffort: meta.podBestEffort,
|
||||
podRequest: meta.podRequest,
|
||||
serviceAffinityInUse: meta.serviceAffinityInUse,
|
||||
ignoredExtendedResources: meta.ignoredExtendedResources,
|
||||
}
|
||||
newPredMeta.podPorts = append([]*v1.ContainerPort(nil), meta.podPorts...)
|
||||
newPredMeta.matchingAntiAffinityTerms = map[string][]matchingPodAntiAffinityTerm{}
|
||||
|
@ -29,6 +29,7 @@ import (
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/util/rand"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
corelisters "k8s.io/client-go/listers/core/v1"
|
||||
storagelisters "k8s.io/client-go/listers/storage/v1"
|
||||
@ -712,9 +713,15 @@ func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
|
||||
}
|
||||
|
||||
// No extended resources should be ignored by default.
|
||||
ignoredExtendedResources := sets.NewString()
|
||||
|
||||
var podRequest *schedulercache.Resource
|
||||
if predicateMeta, ok := meta.(*predicateMetadata); ok {
|
||||
podRequest = predicateMeta.podRequest
|
||||
if predicateMeta.ignoredExtendedResources != nil {
|
||||
ignoredExtendedResources = predicateMeta.ignoredExtendedResources
|
||||
}
|
||||
} else {
|
||||
// We couldn't parse metadata - fallback to computing it.
|
||||
podRequest = GetResourceRequest(pod)
|
||||
@ -743,6 +750,13 @@ func PodFitsResources(pod *v1.Pod, meta algorithm.PredicateMetadata, nodeInfo *s
|
||||
}
|
||||
|
||||
for rName, rQuant := range podRequest.ScalarResources {
|
||||
if v1helper.IsExtendedResourceName(rName) {
|
||||
// If this resource is one of the extended resources that should be
|
||||
// ignored, we will skip checking it.
|
||||
if ignoredExtendedResources.Has(string(rName)) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
|
||||
predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
|
||||
}
|
||||
|
@ -27,6 +27,7 @@ import (
|
||||
storagev1 "k8s.io/api/storage/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
|
||||
@ -93,11 +94,12 @@ func PredicateMetadata(p *v1.Pod, nodeInfo map[string]*schedulercache.NodeInfo)
|
||||
|
||||
func TestPodFitsResources(t *testing.T) {
|
||||
enoughPodsTests := []struct {
|
||||
pod *v1.Pod
|
||||
nodeInfo *schedulercache.NodeInfo
|
||||
fits bool
|
||||
test string
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
pod *v1.Pod
|
||||
nodeInfo *schedulercache.NodeInfo
|
||||
fits bool
|
||||
test string
|
||||
reasons []algorithm.PredicateFailureReason
|
||||
ignoredExtendedResources sets.String
|
||||
}{
|
||||
{
|
||||
pod: &v1.Pod{},
|
||||
@ -323,12 +325,23 @@ func TestPodFitsResources(t *testing.T) {
|
||||
test: "hugepages resource allocatable enforced for multiple containers",
|
||||
reasons: []algorithm.PredicateFailureReason{NewInsufficientResourceError(hugePageResourceA, 6, 2, 5)},
|
||||
},
|
||||
{
|
||||
pod: newResourcePod(
|
||||
schedulercache.Resource{MilliCPU: 1, Memory: 1, ScalarResources: map[v1.ResourceName]int64{extendedResourceB: 1}}),
|
||||
nodeInfo: schedulercache.NewNodeInfo(
|
||||
newResourcePod(schedulercache.Resource{MilliCPU: 0, Memory: 0})),
|
||||
fits: true,
|
||||
ignoredExtendedResources: sets.NewString(string(extendedResourceB)),
|
||||
test: "skip checking ignored extended resource",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range enoughPodsTests {
|
||||
node := v1.Node{Status: v1.NodeStatus{Capacity: makeResources(10, 20, 0, 32, 5, 20, 5).Capacity, Allocatable: makeAllocatableResources(10, 20, 0, 32, 5, 20, 5)}}
|
||||
test.nodeInfo.SetNode(&node)
|
||||
fits, reasons, err := PodFitsResources(test.pod, PredicateMetadata(test.pod, nil), test.nodeInfo)
|
||||
RegisterPredicateMetadataProducerWithExtendedResourceOptions(test.ignoredExtendedResources)
|
||||
meta := PredicateMetadata(test.pod, nil)
|
||||
fits, reasons, err := PodFitsResources(test.pod, meta, test.nodeInfo)
|
||||
if err != nil {
|
||||
t.Errorf("%s: unexpected error: %v", test.test, err)
|
||||
}
|
||||
|
@ -41,6 +41,10 @@ type SchedulerExtender interface {
|
||||
|
||||
// IsBinder returns whether this extender is configured for the Bind method.
|
||||
IsBinder() bool
|
||||
|
||||
// IsInterested returns true if at least one extended resource requested by
|
||||
// this pod is managed by this extender.
|
||||
IsInterested(pod *v1.Pod) bool
|
||||
}
|
||||
|
||||
// ScheduleAlgorithm is an interface implemented by things that know how to schedule pods
|
||||
|
@ -151,6 +151,16 @@ type LabelPreference struct {
|
||||
Presence bool
|
||||
}
|
||||
|
||||
// ExtenderManagedResource describes the arguments of extended resources
|
||||
// managed by an extender.
|
||||
type ExtenderManagedResource struct {
|
||||
// Name is the extended resource name.
|
||||
Name v1.ResourceName
|
||||
// IgnoredByScheduler indicates whether kube-scheduler should ignore this
|
||||
// resource when applying predicates.
|
||||
IgnoredByScheduler bool
|
||||
}
|
||||
|
||||
// ExtenderConfig holds the parameters used to communicate with the extender. If a verb is unspecified/empty,
|
||||
// it is assumed that the extender chose not to provide that extension.
|
||||
type ExtenderConfig struct {
|
||||
@ -178,6 +188,16 @@ type ExtenderConfig struct {
|
||||
// so the scheduler should only send minimal information about the eligible nodes
|
||||
// assuming that the extender already cached full details of all nodes in the cluster
|
||||
NodeCacheCapable bool
|
||||
// ManagedResources is a list of extended resources that are managed by
|
||||
// this extender.
|
||||
// - A pod will be sent to the extender on the Filter, Prioritize and Bind
|
||||
// (if the extender is the binder) phases iff the pod requests at least
|
||||
// one of the extended resources in this list. If empty or unspecified,
|
||||
// all pods will be sent to this extender.
|
||||
// - If IgnoredByScheduler is set to true for a resource, kube-scheduler
|
||||
// will skip checking the resource in predicates.
|
||||
// +optional
|
||||
ManagedResources []ExtenderManagedResource
|
||||
}
|
||||
|
||||
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize
|
||||
|
@ -125,6 +125,16 @@ type LabelPreference struct {
|
||||
Presence bool `json:"presence"`
|
||||
}
|
||||
|
||||
// ExtenderManagedResource describes the arguments of extended resources
|
||||
// managed by an extender.
|
||||
type ExtenderManagedResource struct {
|
||||
// Name is the extended resource name.
|
||||
Name apiv1.ResourceName `json:"name,casttype=ResourceName"`
|
||||
// IgnoredByScheduler indicates whether kube-scheduler should ignore this
|
||||
// resource when applying predicates.
|
||||
IgnoredByScheduler bool `json:"ignoredByScheduler,omitempty"`
|
||||
}
|
||||
|
||||
// ExtenderConfig holds the parameters used to communicate with the extender. If a verb is unspecified/empty,
|
||||
// it is assumed that the extender chose not to provide that extension.
|
||||
type ExtenderConfig struct {
|
||||
@ -152,6 +162,16 @@ type ExtenderConfig struct {
|
||||
// so the scheduler should only send minimal information about the eligible nodes
|
||||
// assuming that the extender already cached full details of all nodes in the cluster
|
||||
NodeCacheCapable bool `json:"nodeCacheCapable,omitempty"`
|
||||
// ManagedResources is a list of extended resources that are managed by
|
||||
// this extender.
|
||||
// - A pod will be sent to the extender on the Filter, Prioritize and Bind
|
||||
// (if the extender is the binder) phases iff the pod requests at least
|
||||
// one of the extended resources in this list. If empty or unspecified,
|
||||
// all pods will be sent to this extender.
|
||||
// - If IgnoredByScheduler is set to true for a resource, kube-scheduler
|
||||
// will skip checking the resource in predicates.
|
||||
// +optional
|
||||
ManagedResources []ExtenderManagedResource `json:"managedResources,omitempty"`
|
||||
}
|
||||
|
||||
// ExtenderArgs represents the arguments needed by the extender to filter/prioritize
|
||||
|
21
pkg/scheduler/api/v1/zz_generated.deepcopy.go
generated
21
pkg/scheduler/api/v1/zz_generated.deepcopy.go
generated
@ -109,6 +109,11 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) {
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
}
|
||||
if in.ManagedResources != nil {
|
||||
in, out := &in.ManagedResources, &out.ManagedResources
|
||||
*out = make([]ExtenderManagedResource, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -167,6 +172,22 @@ func (in *ExtenderFilterResult) DeepCopy() *ExtenderFilterResult {
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderManagedResource) DeepCopyInto(out *ExtenderManagedResource) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderManagedResource.
|
||||
func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(ExtenderManagedResource)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) {
|
||||
{
|
||||
|
@ -11,8 +11,12 @@ go_library(
|
||||
srcs = ["validation.go"],
|
||||
importpath = "k8s.io/kubernetes/pkg/scheduler/api/validation",
|
||||
deps = [
|
||||
"//pkg/apis/core/v1/helper:go_default_library",
|
||||
"//pkg/scheduler/api:go_default_library",
|
||||
"//vendor/k8s.io/api/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -17,9 +17,14 @@ limitations under the License.
|
||||
package validation
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
)
|
||||
|
||||
@ -35,6 +40,7 @@ func ValidatePolicy(policy schedulerapi.Policy) error {
|
||||
}
|
||||
|
||||
binders := 0
|
||||
extenderManagedResources := sets.NewString()
|
||||
for _, extender := range policy.ExtenderConfigs {
|
||||
if len(extender.PrioritizeVerb) > 0 && extender.Weight <= 0 {
|
||||
validationErrors = append(validationErrors, fmt.Errorf("Priority for extender %s should have a positive weight applied to it", extender.URLPrefix))
|
||||
@ -42,9 +48,35 @@ func ValidatePolicy(policy schedulerapi.Policy) error {
|
||||
if extender.BindVerb != "" {
|
||||
binders++
|
||||
}
|
||||
for _, resource := range extender.ManagedResources {
|
||||
errs := validateExtendedResourceName(resource.Name)
|
||||
if len(errs) != 0 {
|
||||
validationErrors = append(validationErrors, errs...)
|
||||
}
|
||||
if extenderManagedResources.Has(string(resource.Name)) {
|
||||
validationErrors = append(validationErrors, fmt.Errorf("Duplicate extender managed resource name %s", string(resource.Name)))
|
||||
}
|
||||
extenderManagedResources.Insert(string(resource.Name))
|
||||
}
|
||||
}
|
||||
if binders > 1 {
|
||||
validationErrors = append(validationErrors, fmt.Errorf("Only one extender can implement bind, found %v", binders))
|
||||
}
|
||||
return utilerrors.NewAggregate(validationErrors)
|
||||
}
|
||||
|
||||
// validateExtendedResourceName checks whether the specified name is a valid
|
||||
// extended resource name.
|
||||
func validateExtendedResourceName(name v1.ResourceName) []error {
|
||||
var validationErrors []error
|
||||
for _, msg := range validation.IsQualifiedName(string(name)) {
|
||||
validationErrors = append(validationErrors, errors.New(msg))
|
||||
}
|
||||
if len(validationErrors) != 0 {
|
||||
return validationErrors
|
||||
}
|
||||
if !v1helper.IsExtendedResourceName(name) {
|
||||
validationErrors = append(validationErrors, fmt.Errorf("%s is an invalid extended resource name", name))
|
||||
}
|
||||
return validationErrors
|
||||
}
|
||||
|
@ -69,6 +69,21 @@ func TestValidatePolicy(t *testing.T) {
|
||||
}},
|
||||
expected: errors.New("Only one extender can implement bind, found 2"),
|
||||
},
|
||||
{
|
||||
policy: api.Policy{
|
||||
ExtenderConfigs: []api.ExtenderConfig{
|
||||
{URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []api.ExtenderManagedResource{{Name: "foo.com/bar"}}},
|
||||
{URLPrefix: "http://127.0.0.1:8082/extender", BindVerb: "bind", ManagedResources: []api.ExtenderManagedResource{{Name: "foo.com/bar"}}},
|
||||
}},
|
||||
expected: errors.New("Duplicate extender managed resource name foo.com/bar"),
|
||||
},
|
||||
{
|
||||
policy: api.Policy{
|
||||
ExtenderConfigs: []api.ExtenderConfig{
|
||||
{URLPrefix: "http://127.0.0.1:8081/extender", ManagedResources: []api.ExtenderManagedResource{{Name: "kubernetes.io/foo"}}},
|
||||
}},
|
||||
expected: errors.New("kubernetes.io/foo is an invalid extended resource name"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
21
pkg/scheduler/api/zz_generated.deepcopy.go
generated
21
pkg/scheduler/api/zz_generated.deepcopy.go
generated
@ -109,6 +109,11 @@ func (in *ExtenderConfig) DeepCopyInto(out *ExtenderConfig) {
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
}
|
||||
if in.ManagedResources != nil {
|
||||
in, out := &in.ManagedResources, &out.ManagedResources
|
||||
*out = make([]ExtenderManagedResource, len(*in))
|
||||
copy(*out, *in)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@ -167,6 +172,22 @@ func (in *ExtenderFilterResult) DeepCopy() *ExtenderFilterResult {
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ExtenderManagedResource) DeepCopyInto(out *ExtenderManagedResource) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ExtenderManagedResource.
|
||||
func (in *ExtenderManagedResource) DeepCopy() *ExtenderManagedResource {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(ExtenderManagedResource)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in FailedNodesMap) DeepCopyInto(out *FailedNodesMap) {
|
||||
{
|
||||
|
@ -26,6 +26,7 @@ import (
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
utilnet "k8s.io/apimachinery/pkg/util/net"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
restclient "k8s.io/client-go/rest"
|
||||
"k8s.io/kubernetes/pkg/scheduler/algorithm"
|
||||
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
|
||||
@ -46,6 +47,7 @@ type HTTPExtender struct {
|
||||
weight int
|
||||
client *http.Client
|
||||
nodeCacheCapable bool
|
||||
managedResources sets.String
|
||||
}
|
||||
|
||||
func makeTransport(config *schedulerapi.ExtenderConfig) (http.RoundTripper, error) {
|
||||
@ -85,6 +87,10 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
|
||||
Transport: transport,
|
||||
Timeout: config.HTTPTimeout,
|
||||
}
|
||||
managedResources := sets.NewString()
|
||||
for _, r := range config.ManagedResources {
|
||||
managedResources.Insert(string(r.Name))
|
||||
}
|
||||
return &HTTPExtender{
|
||||
extenderURL: config.URLPrefix,
|
||||
filterVerb: config.FilterVerb,
|
||||
@ -93,6 +99,7 @@ func NewHTTPExtender(config *schedulerapi.ExtenderConfig) (algorithm.SchedulerEx
|
||||
weight: config.Weight,
|
||||
client: client,
|
||||
nodeCacheCapable: config.NodeCacheCapable,
|
||||
managedResources: managedResources,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -252,3 +259,35 @@ func (h *HTTPExtender) send(action string, args interface{}, result interface{})
|
||||
|
||||
return json.NewDecoder(resp.Body).Decode(result)
|
||||
}
|
||||
|
||||
// IsInterested returns true if at least one extended resource requested by
|
||||
// this pod is managed by this extender.
|
||||
func (h *HTTPExtender) IsInterested(pod *v1.Pod) bool {
|
||||
if h.managedResources.Len() == 0 {
|
||||
return true
|
||||
}
|
||||
if h.hasManagedResources(pod.Spec.Containers) {
|
||||
return true
|
||||
}
|
||||
if h.hasManagedResources(pod.Spec.InitContainers) {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (h *HTTPExtender) hasManagedResources(containers []v1.Container) bool {
|
||||
for i := range containers {
|
||||
container := &containers[i]
|
||||
for resourceName := range container.Resources.Requests {
|
||||
if h.managedResources.Has(string(resourceName)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
for resourceName := range container.Resources.Limits {
|
||||
if h.managedResources.Has(string(resourceName)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
@ -110,6 +110,7 @@ type FakeExtender struct {
|
||||
weight int
|
||||
nodeCacheCapable bool
|
||||
filteredNodes []*v1.Node
|
||||
unInterested bool
|
||||
}
|
||||
|
||||
func (f *FakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) ([]*v1.Node, schedulerapi.FailedNodesMap, error) {
|
||||
@ -183,6 +184,10 @@ func (f *FakeExtender) IsBinder() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (f *FakeExtender) IsInterested(pod *v1.Pod) bool {
|
||||
return !f.unInterested
|
||||
}
|
||||
|
||||
var _ algorithm.SchedulerExtender = &FakeExtender{}
|
||||
|
||||
func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
@ -304,6 +309,28 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
|
||||
expectedHost: "machine2", // machine2 has higher score
|
||||
name: "test 7",
|
||||
},
|
||||
{
|
||||
// Scheduler is expected to not send pod to extender in
|
||||
// Filter/Prioritize phases if the extender is not interested in
|
||||
// the pod.
|
||||
//
|
||||
// If scheduler sends the pod by mistake, the test will fail
|
||||
// because of the errors from errorPredicateExtender and/or
|
||||
// errorPrioritizerExtender.
|
||||
predicates: map[string]algorithm.FitPredicate{"true": truePredicate},
|
||||
prioritizers: []algorithm.PriorityConfig{{Map: EqualPriorityMap, Weight: 1}},
|
||||
extenders: []FakeExtender{
|
||||
{
|
||||
predicates: []fitPredicate{errorPredicateExtender},
|
||||
prioritizers: []priorityConfig{{errorPrioritizerExtender, 10}},
|
||||
unInterested: true,
|
||||
},
|
||||
},
|
||||
nodes: []string{"machine1", "machine2"},
|
||||
expectsErr: false,
|
||||
expectedHost: "machine2", // machine2 has higher score
|
||||
name: "test 8",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
|
@ -354,6 +354,9 @@ func findNodesThatFit(
|
||||
|
||||
if len(filtered) > 0 && len(extenders) != 0 {
|
||||
for _, extender := range extenders {
|
||||
if !extender.IsInterested(pod) {
|
||||
continue
|
||||
}
|
||||
filteredList, failedMap, err := extender.Filter(pod, filtered, nodeNameToInfo)
|
||||
if err != nil {
|
||||
return []*v1.Node{}, FailedPredicateMap{}, err
|
||||
@ -624,6 +627,9 @@ func PrioritizeNodes(
|
||||
if len(extenders) != 0 && nodes != nil {
|
||||
combinedScores := make(map[string]int, len(nodeNameToInfo))
|
||||
for _, extender := range extenders {
|
||||
if !extender.IsInterested(pod) {
|
||||
continue
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(ext algorithm.SchedulerExtender) {
|
||||
defer wg.Done()
|
||||
|
@ -969,6 +969,7 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
|
||||
|
||||
extenders := make([]algorithm.SchedulerExtender, 0)
|
||||
if len(policy.ExtenderConfigs) != 0 {
|
||||
ignoredExtendedResources := sets.NewString()
|
||||
for ii := range policy.ExtenderConfigs {
|
||||
glog.V(2).Infof("Creating extender with config %+v", policy.ExtenderConfigs[ii])
|
||||
extender, err := core.NewHTTPExtender(&policy.ExtenderConfigs[ii])
|
||||
@ -976,7 +977,13 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
|
||||
return nil, err
|
||||
}
|
||||
extenders = append(extenders, extender)
|
||||
for _, r := range policy.ExtenderConfigs[ii].ManagedResources {
|
||||
if r.IgnoredByScheduler {
|
||||
ignoredExtendedResources.Insert(string(r.Name))
|
||||
}
|
||||
}
|
||||
}
|
||||
predicates.RegisterPredicateMetadataProducerWithExtendedResourceOptions(ignoredExtendedResources)
|
||||
}
|
||||
// Providing HardPodAffinitySymmetricWeight in the policy config is the new and preferred way of providing the value.
|
||||
// Give it higher precedence than scheduler CLI configuration when it is provided.
|
||||
@ -992,14 +999,22 @@ func (c *configFactory) CreateFromConfig(policy schedulerapi.Policy) (*scheduler
|
||||
return c.CreateFromKeys(predicateKeys, priorityKeys, extenders)
|
||||
}
|
||||
|
||||
// getBinder returns an extender that supports bind or a default binder.
|
||||
func (c *configFactory) getBinder(extenders []algorithm.SchedulerExtender) scheduler.Binder {
|
||||
// getBinderFunc returns an func which returns an extender that supports bind or a default binder based on the given pod.
|
||||
func (c *configFactory) getBinderFunc(extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) scheduler.Binder {
|
||||
var extenderBinder algorithm.SchedulerExtender
|
||||
for i := range extenders {
|
||||
if extenders[i].IsBinder() {
|
||||
return extenders[i]
|
||||
extenderBinder = extenders[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
return &binder{c.client}
|
||||
defaultBinder := &binder{c.client}
|
||||
return func(pod *v1.Pod) scheduler.Binder {
|
||||
if extenderBinder != nil && extenderBinder.IsInterested(pod) {
|
||||
return extenderBinder
|
||||
}
|
||||
return defaultBinder
|
||||
}
|
||||
}
|
||||
|
||||
// Creates a scheduler from a set of registered fit predicate keys and priority keys.
|
||||
@ -1051,7 +1066,7 @@ func (c *configFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
|
||||
// The scheduler only needs to consider schedulable nodes.
|
||||
NodeLister: &nodeLister{c.nodeLister},
|
||||
Algorithm: algo,
|
||||
Binder: c.getBinder(extenders),
|
||||
GetBinder: c.getBinderFunc(extenders),
|
||||
PodConditionUpdater: &podConditionUpdater{c.client},
|
||||
PodPreemptor: &podPreemptor{c.client},
|
||||
WaitForCacheSync: func() bool {
|
||||
|
@ -17,6 +17,8 @@ limitations under the License.
|
||||
package factory
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"reflect"
|
||||
@ -533,3 +535,85 @@ func newConfigFactory(client *clientset.Clientset, hardPodAffinitySymmetricWeigh
|
||||
enableEquivalenceCache,
|
||||
)
|
||||
}
|
||||
|
||||
type fakeExtender struct {
|
||||
isBinder bool
|
||||
interestedPodName string
|
||||
}
|
||||
|
||||
func (f *fakeExtender) Filter(pod *v1.Pod, nodes []*v1.Node, nodeNameToInfo map[string]*schedulercache.NodeInfo) (filteredNodes []*v1.Node, failedNodesMap schedulerapi.FailedNodesMap, err error) {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
func (f *fakeExtender) Prioritize(pod *v1.Pod, nodes []*v1.Node) (hostPriorities *schedulerapi.HostPriorityList, weight int, err error) {
|
||||
return nil, 0, nil
|
||||
}
|
||||
|
||||
func (f *fakeExtender) Bind(binding *v1.Binding) error {
|
||||
if f.isBinder {
|
||||
return nil
|
||||
}
|
||||
return errors.New("not a binder")
|
||||
}
|
||||
|
||||
func (f *fakeExtender) IsBinder() bool {
|
||||
return f.isBinder
|
||||
}
|
||||
|
||||
func (f *fakeExtender) IsInterested(pod *v1.Pod) bool {
|
||||
return pod != nil && pod.Name == f.interestedPodName
|
||||
}
|
||||
|
||||
func TestGetBinderFunc(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
podName string
|
||||
extenders []algorithm.SchedulerExtender
|
||||
|
||||
expectedBinderType string
|
||||
}{
|
||||
// Expect to return the default binder because the extender is not a
|
||||
// binder, even though it's interested in the pod.
|
||||
{
|
||||
podName: "pod0",
|
||||
extenders: []algorithm.SchedulerExtender{
|
||||
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
|
||||
},
|
||||
expectedBinderType: "*factory.binder",
|
||||
},
|
||||
// Expect to return the fake binder because one of the extenders is a
|
||||
// binder and it's interested in the pod.
|
||||
{
|
||||
podName: "pod0",
|
||||
extenders: []algorithm.SchedulerExtender{
|
||||
&fakeExtender{isBinder: false, interestedPodName: "pod0"},
|
||||
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
|
||||
},
|
||||
expectedBinderType: "*factory.fakeExtender",
|
||||
},
|
||||
// Expect to return the default binder because one of the extenders is
|
||||
// a binder but the binder is not interested in the pod.
|
||||
{
|
||||
podName: "pod1",
|
||||
extenders: []algorithm.SchedulerExtender{
|
||||
&fakeExtender{isBinder: false, interestedPodName: "pod1"},
|
||||
&fakeExtender{isBinder: true, interestedPodName: "pod0"},
|
||||
},
|
||||
expectedBinderType: "*factory.binder",
|
||||
},
|
||||
} {
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: test.podName,
|
||||
},
|
||||
}
|
||||
|
||||
f := &configFactory{}
|
||||
binderFunc := f.getBinderFunc(test.extenders)
|
||||
binder := binderFunc(pod)
|
||||
|
||||
binderType := fmt.Sprintf("%s", reflect.TypeOf(binder))
|
||||
if binderType != test.expectedBinderType {
|
||||
t.Errorf("Expected binder %q but got %q", test.expectedBinderType, binderType)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -107,7 +107,7 @@ type Config struct {
|
||||
Ecache *core.EquivalenceCache
|
||||
NodeLister algorithm.NodeLister
|
||||
Algorithm algorithm.ScheduleAlgorithm
|
||||
Binder Binder
|
||||
GetBinder func(pod *v1.Pod) Binder
|
||||
// PodConditionUpdater is used only in case of scheduling errors. If we succeed
|
||||
// with scheduling, PodScheduled condition will be updated in apiserver in /bind
|
||||
// handler so that binding and setting PodCondition it is atomic.
|
||||
@ -403,7 +403,7 @@ func (sched *Scheduler) bind(assumed *v1.Pod, b *v1.Binding) error {
|
||||
bindingStart := time.Now()
|
||||
// If binding succeeded then PodScheduled condition will be updated in apiserver so that
|
||||
// it's atomic with setting host.
|
||||
err := sched.config.Binder.Bind(b)
|
||||
err := sched.config.GetBinder(assumed).Bind(b)
|
||||
if err := sched.config.SchedulerCache.FinishBinding(assumed); err != nil {
|
||||
glog.Errorf("scheduler cache FinishBinding failed: %v", err)
|
||||
}
|
||||
|
@ -195,10 +195,12 @@ func TestScheduler(t *testing.T) {
|
||||
[]*v1.Node{&testNode},
|
||||
),
|
||||
Algorithm: item.algo,
|
||||
Binder: fakeBinder{func(b *v1.Binding) error {
|
||||
gotBinding = b
|
||||
return item.injectBindError
|
||||
}},
|
||||
GetBinder: func(pod *v1.Pod) Binder {
|
||||
return fakeBinder{func(b *v1.Binding) error {
|
||||
gotBinding = b
|
||||
return item.injectBindError
|
||||
}}
|
||||
},
|
||||
PodConditionUpdater: fakePodConditionUpdater{},
|
||||
Error: func(p *v1.Pod, err error) {
|
||||
gotPod = p
|
||||
@ -543,10 +545,12 @@ func setupTestScheduler(queuedPodStore *clientcache.FIFO, scache schedulercache.
|
||||
SchedulerCache: scache,
|
||||
NodeLister: nodeLister,
|
||||
Algorithm: algo,
|
||||
Binder: fakeBinder{func(b *v1.Binding) error {
|
||||
bindingChan <- b
|
||||
return nil
|
||||
}},
|
||||
GetBinder: func(pod *v1.Pod) Binder {
|
||||
return fakeBinder{func(b *v1.Binding) error {
|
||||
bindingChan <- b
|
||||
return nil
|
||||
}}
|
||||
},
|
||||
NextPod: func() *v1.Pod {
|
||||
return clientcache.Pop(queuedPodStore).(*v1.Pod)
|
||||
},
|
||||
@ -588,11 +592,13 @@ func setupTestSchedulerLongBindingWithRetry(queuedPodStore *clientcache.FIFO, sc
|
||||
SchedulerCache: scache,
|
||||
NodeLister: nodeLister,
|
||||
Algorithm: algo,
|
||||
Binder: fakeBinder{func(b *v1.Binding) error {
|
||||
time.Sleep(bindingTime)
|
||||
bindingChan <- b
|
||||
return nil
|
||||
}},
|
||||
GetBinder: func(pod *v1.Pod) Binder {
|
||||
return fakeBinder{func(b *v1.Binding) error {
|
||||
time.Sleep(bindingTime)
|
||||
bindingChan <- b
|
||||
return nil
|
||||
}}
|
||||
},
|
||||
WaitForCacheSync: func() bool {
|
||||
return true
|
||||
},
|
||||
|
@ -47,9 +47,10 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
filter = "filter"
|
||||
prioritize = "prioritize"
|
||||
bind = "bind"
|
||||
filter = "filter"
|
||||
prioritize = "prioritize"
|
||||
bind = "bind"
|
||||
extendedResourceName = "foo.com/bar"
|
||||
)
|
||||
|
||||
type fitPredicate func(pod *v1.Pod, node *v1.Node) (bool, error)
|
||||
@ -343,6 +344,12 @@ func TestSchedulerExtender(t *testing.T) {
|
||||
BindVerb: bind,
|
||||
Weight: 4,
|
||||
EnableHTTPS: false,
|
||||
ManagedResources: []schedulerapi.ExtenderManagedResource{
|
||||
{
|
||||
Name: extendedResourceName,
|
||||
IgnoredByScheduler: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
URLPrefix: es3.URL,
|
||||
@ -420,7 +427,17 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface)
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "extender-test-pod"},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{{Name: "container", Image: e2e.GetPauseImageName(cs)}},
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: "container",
|
||||
Image: e2e.GetPauseImageName(cs),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
extendedResourceName: *resource.NewQuantity(1, resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user