mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 03:41:45 +00:00
Merge pull request #78097 from draveness/feature/post-filter-extension-point
feat: implement "post-filter" extension point for scheduling framework
This commit is contained in:
commit
cc270c138d
@ -59,6 +59,7 @@ go_test(
|
|||||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||||
|
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||||
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
|
||||||
|
@ -29,7 +29,7 @@ import (
|
|||||||
|
|
||||||
"k8s.io/klog"
|
"k8s.io/klog"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
policy "k8s.io/api/policy/v1beta1"
|
policy "k8s.io/api/policy/v1beta1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/labels"
|
"k8s.io/apimachinery/pkg/labels"
|
||||||
@ -89,9 +89,10 @@ type FailedPredicateMap map[string][]predicates.PredicateFailureReason
|
|||||||
|
|
||||||
// FitError describes a fit error of a pod.
|
// FitError describes a fit error of a pod.
|
||||||
type FitError struct {
|
type FitError struct {
|
||||||
Pod *v1.Pod
|
Pod *v1.Pod
|
||||||
NumAllNodes int
|
NumAllNodes int
|
||||||
FailedPredicates FailedPredicateMap
|
FailedPredicates FailedPredicateMap
|
||||||
|
FilteredNodesStatuses framework.NodeToStatusMap
|
||||||
}
|
}
|
||||||
|
|
||||||
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
|
// ErrNoNodesAvailable is used to describe the error that no nodes available to schedule pods.
|
||||||
@ -111,6 +112,10 @@ func (f *FitError) Error() string {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, status := range f.FilteredNodesStatuses {
|
||||||
|
reasons[status.Message()]++
|
||||||
|
}
|
||||||
|
|
||||||
sortReasonsHistogram := func() []string {
|
sortReasonsHistogram := func() []string {
|
||||||
reasonStrings := []string{}
|
reasonStrings := []string{}
|
||||||
for k, v := range reasons {
|
for k, v := range reasons {
|
||||||
@ -206,16 +211,23 @@ func (g *genericScheduler) Schedule(pod *v1.Pod, nodeLister algorithm.NodeLister
|
|||||||
|
|
||||||
trace.Step("Basic checks done")
|
trace.Step("Basic checks done")
|
||||||
startPredicateEvalTime := time.Now()
|
startPredicateEvalTime := time.Now()
|
||||||
filteredNodes, failedPredicateMap, err := g.findNodesThatFit(pluginContext, pod, nodeLister)
|
filteredNodes, failedPredicateMap, filteredNodesStatuses, err := g.findNodesThatFit(pluginContext, pod, nodeLister)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return result, err
|
return result, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Run "postfilter" plugins.
|
||||||
|
postfilterStatus := g.framework.RunPostFilterPlugins(pluginContext, pod, filteredNodes, filteredNodesStatuses)
|
||||||
|
if !postfilterStatus.IsSuccess() {
|
||||||
|
return result, postfilterStatus.AsError()
|
||||||
|
}
|
||||||
|
|
||||||
if len(filteredNodes) == 0 {
|
if len(filteredNodes) == 0 {
|
||||||
return result, &FitError{
|
return result, &FitError{
|
||||||
Pod: pod,
|
Pod: pod,
|
||||||
NumAllNodes: numNodes,
|
NumAllNodes: numNodes,
|
||||||
FailedPredicates: failedPredicateMap,
|
FailedPredicates: failedPredicateMap,
|
||||||
|
FilteredNodesStatuses: filteredNodesStatuses,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace.Step("Computing predicates done")
|
trace.Step("Computing predicates done")
|
||||||
@ -449,9 +461,10 @@ func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes i
|
|||||||
|
|
||||||
// Filters the nodes to find the ones that fit based on the given predicate functions
|
// Filters the nodes to find the ones that fit based on the given predicate functions
|
||||||
// Each node is passed through the predicate functions to determine if it is a fit
|
// Each node is passed through the predicate functions to determine if it is a fit
|
||||||
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, error) {
|
func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginContext, pod *v1.Pod, nodeLister algorithm.NodeLister) ([]*v1.Node, FailedPredicateMap, framework.NodeToStatusMap, error) {
|
||||||
var filtered []*v1.Node
|
var filtered []*v1.Node
|
||||||
failedPredicateMap := FailedPredicateMap{}
|
failedPredicateMap := FailedPredicateMap{}
|
||||||
|
filteredNodesStatuses := framework.NodeToStatusMap{}
|
||||||
|
|
||||||
if len(g.predicates) == 0 {
|
if len(g.predicates) == 0 {
|
||||||
filtered = nodeLister.ListNodes()
|
filtered = nodeLister.ListNodes()
|
||||||
@ -495,8 +508,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
|||||||
status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName)
|
status := g.framework.RunFilterPlugins(pluginContext, pod, nodeName)
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
predicateResultLock.Lock()
|
predicateResultLock.Lock()
|
||||||
failedPredicateMap[nodeName] = append(failedPredicateMap[nodeName],
|
filteredNodesStatuses[nodeName] = status
|
||||||
predicates.NewFailureReason(status.Message()))
|
|
||||||
if status.Code() != framework.Unschedulable {
|
if status.Code() != framework.Unschedulable {
|
||||||
errs[status.Message()]++
|
errs[status.Message()]++
|
||||||
}
|
}
|
||||||
@ -524,7 +536,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
|||||||
|
|
||||||
filtered = filtered[:filteredLen]
|
filtered = filtered[:filteredLen]
|
||||||
if len(errs) > 0 {
|
if len(errs) > 0 {
|
||||||
return []*v1.Node{}, FailedPredicateMap{}, errors.CreateAggregateFromMessageCountMap(errs)
|
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, errors.CreateAggregateFromMessageCountMap(errs)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -539,9 +551,9 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
|||||||
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
klog.Warningf("Skipping extender %v as it returned error %v and has ignorable flag set",
|
||||||
extender, err)
|
extender, err)
|
||||||
continue
|
continue
|
||||||
} else {
|
|
||||||
return []*v1.Node{}, FailedPredicateMap{}, err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return []*v1.Node{}, FailedPredicateMap{}, framework.NodeToStatusMap{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
for failedNodeName, failedMsg := range failedMap {
|
for failedNodeName, failedMsg := range failedMap {
|
||||||
@ -556,7 +568,7 @@ func (g *genericScheduler) findNodesThatFit(pluginContext *framework.PluginConte
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return filtered, failedPredicateMap, nil
|
return filtered, failedPredicateMap, filteredNodesStatuses, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addNominatedPods adds pods with equal or greater priority which are nominated
|
// addNominatedPods adds pods with equal or greater priority which are nominated
|
||||||
|
@ -26,9 +26,10 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
apps "k8s.io/api/apps/v1"
|
apps "k8s.io/api/apps/v1"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/errors"
|
"k8s.io/apimachinery/pkg/util/errors"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
@ -139,6 +140,42 @@ func getNodeReducePriority(pod *v1.Pod, meta interface{}, nodeNameToInfo map[str
|
|||||||
var EmptyPluginRegistry = framework.Registry{}
|
var EmptyPluginRegistry = framework.Registry{}
|
||||||
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{})
|
var emptyFramework, _ = framework.NewFramework(EmptyPluginRegistry, nil, []schedulerconfig.PluginConfig{})
|
||||||
|
|
||||||
|
// FakeFilterPlugin is a test filter plugin used by default scheduler.
|
||||||
|
type FakeFilterPlugin struct {
|
||||||
|
numFilterCalled int
|
||||||
|
failFilter bool
|
||||||
|
}
|
||||||
|
|
||||||
|
var filterPlugin = &FakeFilterPlugin{}
|
||||||
|
|
||||||
|
// Name returns name of the plugin.
|
||||||
|
func (fp *FakeFilterPlugin) Name() string {
|
||||||
|
return "fake-filter-plugin"
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset is used to reset filter plugin.
|
||||||
|
func (fp *FakeFilterPlugin) reset() {
|
||||||
|
fp.numFilterCalled = 0
|
||||||
|
fp.failFilter = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Filter is a test function that returns an error or nil, depending on the
|
||||||
|
// value of "failFilter".
|
||||||
|
func (fp *FakeFilterPlugin) Filter(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||||
|
fp.numFilterCalled++
|
||||||
|
|
||||||
|
if fp.failFilter {
|
||||||
|
return framework.NewStatus(framework.Unschedulable, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewFilterPlugin is the factory for filtler plugin.
|
||||||
|
func NewFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
|
return filterPlugin, nil
|
||||||
|
}
|
||||||
|
|
||||||
func makeNodeList(nodeNames []string) []*v1.Node {
|
func makeNodeList(nodeNames []string) []*v1.Node {
|
||||||
result := make([]*v1.Node, 0, len(nodeNames))
|
result := make([]*v1.Node, 0, len(nodeNames))
|
||||||
for _, nodeName := range nodeNames {
|
for _, nodeName := range nodeNames {
|
||||||
@ -219,6 +256,21 @@ func TestSelectHost(t *testing.T) {
|
|||||||
|
|
||||||
func TestGenericScheduler(t *testing.T) {
|
func TestGenericScheduler(t *testing.T) {
|
||||||
defer algorithmpredicates.SetPredicatesOrderingDuringTest(order)()
|
defer algorithmpredicates.SetPredicatesOrderingDuringTest(order)()
|
||||||
|
|
||||||
|
filterPluginRegistry := framework.Registry{filterPlugin.Name(): NewFilterPlugin}
|
||||||
|
filterFramework, err := framework.NewFramework(filterPluginRegistry, &schedulerconfig.Plugins{
|
||||||
|
Filter: &schedulerconfig.PluginSet{
|
||||||
|
Enabled: []schedulerconfig.Plugin{
|
||||||
|
{
|
||||||
|
Name: filterPlugin.Name(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, []schedulerconfig.PluginConfig{})
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Unexpected error when initialize scheduling framework, err :%v", err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
predicates map[string]algorithmpredicates.FitPredicate
|
predicates map[string]algorithmpredicates.FitPredicate
|
||||||
@ -229,6 +281,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
pod *v1.Pod
|
pod *v1.Pod
|
||||||
pods []*v1.Pod
|
pods []*v1.Pod
|
||||||
buildPredMeta bool // build predicates metadata or not
|
buildPredMeta bool // build predicates metadata or not
|
||||||
|
failFilter bool
|
||||||
expectedHosts sets.String
|
expectedHosts sets.String
|
||||||
expectsErr bool
|
expectsErr bool
|
||||||
wErr error
|
wErr error
|
||||||
@ -246,7 +299,9 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
FailedPredicates: FailedPredicateMap{
|
FailedPredicates: FailedPredicateMap{
|
||||||
"machine1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
"machine1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
||||||
"machine2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
"machine2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
||||||
}},
|
},
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
|
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
|
||||||
@ -309,6 +364,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
"2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
"2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
||||||
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
||||||
},
|
},
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -339,6 +395,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
||||||
"2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
"2": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate},
|
||||||
},
|
},
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -426,6 +483,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
FailedPredicates: FailedPredicateMap{
|
FailedPredicates: FailedPredicateMap{
|
||||||
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate},
|
"1": []algorithmpredicates.PredicateFailureReason{algorithmpredicates.ErrFakePredicate, algorithmpredicates.ErrFakePredicate},
|
||||||
},
|
},
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
@ -538,9 +596,29 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
expectedHosts: sets.NewString("machine2", "machine3"),
|
expectedHosts: sets.NewString("machine2", "machine3"),
|
||||||
wErr: nil,
|
wErr: nil,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "test with failed filter plugin",
|
||||||
|
predicates: map[string]algorithmpredicates.FitPredicate{"true": truePredicate},
|
||||||
|
prioritizers: []priorities.PriorityConfig{{Function: numericPriority, Weight: 1}},
|
||||||
|
nodes: []string{"3"},
|
||||||
|
pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
|
||||||
|
expectedHosts: nil,
|
||||||
|
failFilter: true,
|
||||||
|
expectsErr: true,
|
||||||
|
wErr: &FitError{
|
||||||
|
Pod: &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "test-filter", UID: types.UID("test-filter")}},
|
||||||
|
NumAllNodes: 1,
|
||||||
|
FailedPredicates: FailedPredicateMap{},
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{
|
||||||
|
"3": framework.NewStatus(framework.Unschedulable, "injecting failure for pod test-filter"),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
for _, test := range tests {
|
for _, test := range tests {
|
||||||
t.Run(test.name, func(t *testing.T) {
|
t.Run(test.name, func(t *testing.T) {
|
||||||
|
filterPlugin.failFilter = test.failFilter
|
||||||
|
|
||||||
cache := internalcache.New(time.Duration(0), wait.NeverStop)
|
cache := internalcache.New(time.Duration(0), wait.NeverStop)
|
||||||
for _, pod := range test.pods {
|
for _, pod := range test.pods {
|
||||||
cache.AddPod(pod)
|
cache.AddPod(pod)
|
||||||
@ -564,7 +642,7 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
predMetaProducer,
|
predMetaProducer,
|
||||||
test.prioritizers,
|
test.prioritizers,
|
||||||
priorities.EmptyPriorityMetadataProducer,
|
priorities.EmptyPriorityMetadataProducer,
|
||||||
emptyFramework,
|
filterFramework,
|
||||||
[]algorithm.SchedulerExtender{},
|
[]algorithm.SchedulerExtender{},
|
||||||
nil,
|
nil,
|
||||||
pvcLister,
|
pvcLister,
|
||||||
@ -575,11 +653,13 @@ func TestGenericScheduler(t *testing.T) {
|
|||||||
false)
|
false)
|
||||||
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
|
result, err := scheduler.Schedule(test.pod, schedulertesting.FakeNodeLister(makeNodeList(test.nodes)), framework.NewPluginContext())
|
||||||
if !reflect.DeepEqual(err, test.wErr) {
|
if !reflect.DeepEqual(err, test.wErr) {
|
||||||
t.Errorf("Unexpected error: %v, expected: %v", err, test.wErr)
|
t.Errorf("Unexpected error: %v, expected: %v", err.Error(), test.wErr)
|
||||||
}
|
}
|
||||||
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
|
if test.expectedHosts != nil && !test.expectedHosts.Has(result.SuggestedHost) {
|
||||||
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)
|
t.Errorf("Expected: %s, got: %s", test.expectedHosts, result.SuggestedHost)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
filterPlugin.reset()
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -613,7 +693,7 @@ func TestFindFitAllError(t *testing.T) {
|
|||||||
nodes := makeNodeList([]string{"3", "2", "1"})
|
nodes := makeNodeList([]string{"3", "2", "1"})
|
||||||
scheduler := makeScheduler(predicates, nodes)
|
scheduler := makeScheduler(predicates, nodes)
|
||||||
|
|
||||||
_, predicateMap, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes))
|
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, &v1.Pod{}, schedulertesting.FakeNodeLister(nodes))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
@ -643,7 +723,7 @@ func TestFindFitSomeError(t *testing.T) {
|
|||||||
scheduler := makeScheduler(predicates, nodes)
|
scheduler := makeScheduler(predicates, nodes)
|
||||||
|
|
||||||
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "1", UID: types.UID("1")}}
|
||||||
_, predicateMap, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes))
|
_, predicateMap, _, err := scheduler.findNodesThatFit(nil, pod, schedulertesting.FakeNodeLister(nodes))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("unexpected error: %v", err)
|
t.Errorf("unexpected error: %v", err)
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
@ -41,6 +41,7 @@ type framework struct {
|
|||||||
queueSortPlugins []QueueSortPlugin
|
queueSortPlugins []QueueSortPlugin
|
||||||
prefilterPlugins []PrefilterPlugin
|
prefilterPlugins []PrefilterPlugin
|
||||||
filterPlugins []FilterPlugin
|
filterPlugins []FilterPlugin
|
||||||
|
postFilterPlugins []PostFilterPlugin
|
||||||
scorePlugins []ScorePlugin
|
scorePlugins []ScorePlugin
|
||||||
scoreWithNormalizePlugins []ScoreWithNormalizePlugin
|
scoreWithNormalizePlugins []ScoreWithNormalizePlugin
|
||||||
reservePlugins []ReservePlugin
|
reservePlugins []ReservePlugin
|
||||||
@ -168,6 +169,20 @@ func NewFramework(r Registry, plugins *config.Plugins, args []config.PluginConfi
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if plugins.PostFilter != nil {
|
||||||
|
for _, r := range plugins.PostFilter.Enabled {
|
||||||
|
if pg, ok := pluginsMap[r.Name]; ok {
|
||||||
|
p, ok := pg.(PostFilterPlugin)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("plugin %v does not extend post-filter plugin", r.Name)
|
||||||
|
}
|
||||||
|
f.postFilterPlugins = append(f.postFilterPlugins, p)
|
||||||
|
} else {
|
||||||
|
return nil, fmt.Errorf("post-filter plugin %v does not exist", r.Name)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if plugins.PreBind != nil {
|
if plugins.PreBind != nil {
|
||||||
for _, pb := range plugins.PreBind.Enabled {
|
for _, pb := range plugins.PreBind.Enabled {
|
||||||
if pg, ok := pluginsMap[pb.Name]; ok {
|
if pg, ok := pluginsMap[pb.Name]; ok {
|
||||||
@ -287,6 +302,7 @@ func (f *framework) RunPrefilterPlugins(
|
|||||||
return NewStatus(Error, msg)
|
return NewStatus(Error, msg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,13 +312,12 @@ func (f *framework) RunPrefilterPlugins(
|
|||||||
// Meanwhile, the failure message and status are set for the given node.
|
// Meanwhile, the failure message and status are set for the given node.
|
||||||
func (f *framework) RunFilterPlugins(pc *PluginContext,
|
func (f *framework) RunFilterPlugins(pc *PluginContext,
|
||||||
pod *v1.Pod, nodeName string) *Status {
|
pod *v1.Pod, nodeName string) *Status {
|
||||||
|
for _, pl := range f.filterPlugins {
|
||||||
for _, p := range f.filterPlugins {
|
status := pl.Filter(pc, pod, nodeName)
|
||||||
status := p.Filter(pc, pod, nodeName)
|
|
||||||
if !status.IsSuccess() {
|
if !status.IsSuccess() {
|
||||||
if status.Code() != Unschedulable {
|
if status.Code() != Unschedulable {
|
||||||
errMsg := fmt.Sprintf("RunFilterPlugins: error while running %s filter plugin for pod %s: %s",
|
errMsg := fmt.Sprintf("RunFilterPlugins: error while running %v filter plugin for pod %v: %v",
|
||||||
p.Name(), pod.Name, status.Message())
|
pl.Name(), pod.Name, status.Message())
|
||||||
klog.Error(errMsg)
|
klog.Error(errMsg)
|
||||||
return NewStatus(Error, errMsg)
|
return NewStatus(Error, errMsg)
|
||||||
}
|
}
|
||||||
@ -313,6 +328,27 @@ func (f *framework) RunFilterPlugins(pc *PluginContext,
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
|
||||||
|
// of these plugins returns any status other than "Success", the given node is
|
||||||
|
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
|
||||||
|
func (f *framework) RunPostFilterPlugins(
|
||||||
|
pc *PluginContext,
|
||||||
|
pod *v1.Pod,
|
||||||
|
nodes []*v1.Node,
|
||||||
|
filteredNodesStatuses NodeToStatusMap,
|
||||||
|
) *Status {
|
||||||
|
for _, pl := range f.postFilterPlugins {
|
||||||
|
status := pl.PostFilter(pc, pod, nodes, filteredNodesStatuses)
|
||||||
|
if !status.IsSuccess() {
|
||||||
|
msg := fmt.Sprintf("error while running %v postfilter plugin for pod %v: %v", pl.Name(), pod.Name, status.Message())
|
||||||
|
klog.Error(msg)
|
||||||
|
return NewStatus(Error, msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
|
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
|
||||||
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
||||||
// It also returns *Status, which is set to non-success if any of the plugins returns
|
// It also returns *Status, which is set to non-success if any of the plugins returns
|
||||||
|
@ -22,7 +22,7 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
internalcache "k8s.io/kubernetes/pkg/scheduler/internal/cache"
|
||||||
)
|
)
|
||||||
@ -36,6 +36,9 @@ type NodeScoreList []int
|
|||||||
// PluginToNodeScoreMap declares a map from plugin name to its NodeScoreList.
|
// PluginToNodeScoreMap declares a map from plugin name to its NodeScoreList.
|
||||||
type PluginToNodeScoreMap map[string]NodeScoreList
|
type PluginToNodeScoreMap map[string]NodeScoreList
|
||||||
|
|
||||||
|
// NodeToStatusMap declares map from node name to its status.
|
||||||
|
type NodeToStatusMap map[string]*Status
|
||||||
|
|
||||||
// These are predefined codes used in a Status.
|
// These are predefined codes used in a Status.
|
||||||
const (
|
const (
|
||||||
// Success means that plugin ran correctly and found pod schedulable.
|
// Success means that plugin ran correctly and found pod schedulable.
|
||||||
@ -160,6 +163,19 @@ type FilterPlugin interface {
|
|||||||
Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
Filter(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PostFilterPlugin is an interface for Post-filter plugin. Post-filter is an
|
||||||
|
// informational extension point. Plugins will be called with a list of nodes
|
||||||
|
// that passed the filtering phase. A plugin may use this data to update internal
|
||||||
|
// state or to generate logs/metrics.
|
||||||
|
type PostFilterPlugin interface {
|
||||||
|
Plugin
|
||||||
|
// PostFilter is called by the scheduling framework after a list of nodes
|
||||||
|
// passed the filtering phase. All postfilter plugins must return success or
|
||||||
|
// the pod will be rejected. The filteredNodesStatuses is the set of filtered nodes
|
||||||
|
// and their filter status.
|
||||||
|
PostFilter(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
|
||||||
|
}
|
||||||
|
|
||||||
// ScorePlugin is an interface that must be implemented by "score" plugins to rank
|
// ScorePlugin is an interface that must be implemented by "score" plugins to rank
|
||||||
// nodes that passed the filtering phase.
|
// nodes that passed the filtering phase.
|
||||||
type ScorePlugin interface {
|
type ScorePlugin interface {
|
||||||
@ -269,6 +285,11 @@ type Framework interface {
|
|||||||
// the given node is not suitable for running the pod.
|
// the given node is not suitable for running the pod.
|
||||||
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
RunFilterPlugins(pc *PluginContext, pod *v1.Pod, nodeName string) *Status
|
||||||
|
|
||||||
|
// RunPostFilterPlugins runs the set of configured post-filter plugins. If any
|
||||||
|
// of these plugins returns any status other than "Success", the given node is
|
||||||
|
// rejected. The filteredNodeStatuses is the set of filtered nodes and their statuses.
|
||||||
|
RunPostFilterPlugins(pc *PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses NodeToStatusMap) *Status
|
||||||
|
|
||||||
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
|
// RunScorePlugins runs the set of configured scoring plugins. It returns a map that
|
||||||
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
// stores for each scoring plugin name the corresponding NodeScoreList(s).
|
||||||
// It also returns *Status, which is set to non-success if any of the plugins returns
|
// It also returns *Status, which is set to non-success if any of the plugins returns
|
||||||
|
@ -24,7 +24,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
dto "github.com/prometheus/client_model/go"
|
dto "github.com/prometheus/client_model/go"
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/types"
|
"k8s.io/apimachinery/pkg/types"
|
||||||
"k8s.io/apimachinery/pkg/util/clock"
|
"k8s.io/apimachinery/pkg/util/clock"
|
||||||
@ -197,6 +197,10 @@ func (*fakeFramework) RunBindPlugins(pc *framework.PluginContext, pod *v1.Pod, n
|
|||||||
|
|
||||||
func (*fakeFramework) RunPostbindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {}
|
func (*fakeFramework) RunPostbindPlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) {}
|
||||||
|
|
||||||
|
func (*fakeFramework) RunPostFilterPlugins(pc *framework.PluginContext, pod *v1.Pod, nodes []*v1.Node, filteredNodesStatuses framework.NodeToStatusMap) *framework.Status {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
func (*fakeFramework) RunReservePlugins(pc *framework.PluginContext, pod *v1.Pod, nodeName string) *framework.Status {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -26,7 +26,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/api/events/v1beta1"
|
"k8s.io/api/events/v1beta1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
@ -415,9 +415,12 @@ func TestSchedulerNoPhantomPodAfterDelete(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
expectErr := &core.FitError{
|
expectErr := &core.FitError{
|
||||||
Pod: secondPod,
|
Pod: secondPod,
|
||||||
NumAllNodes: 1,
|
NumAllNodes: 1,
|
||||||
FailedPredicates: core.FailedPredicateMap{node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts}},
|
FailedPredicates: core.FailedPredicateMap{
|
||||||
|
node.Name: []predicates.PredicateFailureReason{predicates.ErrPodNotFitsHostPorts},
|
||||||
|
},
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||||
}
|
}
|
||||||
if !reflect.DeepEqual(expectErr, err) {
|
if !reflect.DeepEqual(expectErr, err) {
|
||||||
t.Errorf("err want=%v, get=%v", expectErr, err)
|
t.Errorf("err want=%v, get=%v", expectErr, err)
|
||||||
@ -620,9 +623,10 @@ func TestSchedulerFailedSchedulingReasons(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case err := <-errChan:
|
case err := <-errChan:
|
||||||
expectErr := &core.FitError{
|
expectErr := &core.FitError{
|
||||||
Pod: podWithTooBigResourceRequests,
|
Pod: podWithTooBigResourceRequests,
|
||||||
NumAllNodes: len(nodes),
|
NumAllNodes: len(nodes),
|
||||||
FailedPredicates: failedPredicatesMap,
|
FailedPredicates: failedPredicatesMap,
|
||||||
|
FilteredNodesStatuses: framework.NodeToStatusMap{},
|
||||||
}
|
}
|
||||||
if len(fmt.Sprint(expectErr)) > 150 {
|
if len(fmt.Sprint(expectErr)) > 150 {
|
||||||
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
|
t.Errorf("message is too spammy ! %v ", len(fmt.Sprint(expectErr)))
|
||||||
|
@ -21,7 +21,7 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
"k8s.io/apimachinery/pkg/api/resource"
|
"k8s.io/apimachinery/pkg/api/resource"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/runtime"
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
@ -58,6 +58,11 @@ type ReservePlugin struct {
|
|||||||
failReserve bool
|
failReserve bool
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type PostFilterPlugin struct {
|
||||||
|
numPostFilterCalled int
|
||||||
|
failPostFilter bool
|
||||||
|
}
|
||||||
|
|
||||||
type PrebindPlugin struct {
|
type PrebindPlugin struct {
|
||||||
numPrebindCalled int
|
numPrebindCalled int
|
||||||
failPrebind bool
|
failPrebind bool
|
||||||
@ -100,6 +105,7 @@ const (
|
|||||||
scorePluginName = "score-plugin"
|
scorePluginName = "score-plugin"
|
||||||
scoreWithNormalizePluginName = "score-with-normalize-plugin"
|
scoreWithNormalizePluginName = "score-with-normalize-plugin"
|
||||||
filterPluginName = "filter-plugin"
|
filterPluginName = "filter-plugin"
|
||||||
|
postFilterPluginName = "postfilter-plugin"
|
||||||
reservePluginName = "reserve-plugin"
|
reservePluginName = "reserve-plugin"
|
||||||
prebindPluginName = "prebind-plugin"
|
prebindPluginName = "prebind-plugin"
|
||||||
unreservePluginName = "unreserve-plugin"
|
unreservePluginName = "unreserve-plugin"
|
||||||
@ -113,6 +119,7 @@ var _ = framework.FilterPlugin(&FilterPlugin{})
|
|||||||
var _ = framework.ScorePlugin(&ScorePlugin{})
|
var _ = framework.ScorePlugin(&ScorePlugin{})
|
||||||
var _ = framework.ScoreWithNormalizePlugin(&ScoreWithNormalizePlugin{})
|
var _ = framework.ScoreWithNormalizePlugin(&ScoreWithNormalizePlugin{})
|
||||||
var _ = framework.ReservePlugin(&ReservePlugin{})
|
var _ = framework.ReservePlugin(&ReservePlugin{})
|
||||||
|
var _ = framework.PostFilterPlugin(&PostFilterPlugin{})
|
||||||
var _ = framework.PrebindPlugin(&PrebindPlugin{})
|
var _ = framework.PrebindPlugin(&PrebindPlugin{})
|
||||||
var _ = framework.BindPlugin(&BindPlugin{})
|
var _ = framework.BindPlugin(&BindPlugin{})
|
||||||
var _ = framework.PostbindPlugin(&PostbindPlugin{})
|
var _ = framework.PostbindPlugin(&PostbindPlugin{})
|
||||||
@ -241,6 +248,34 @@ func NewReservePlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framewor
|
|||||||
return resPlugin, nil
|
return resPlugin, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Name returns name of the plugin.
|
||||||
|
func (*PostFilterPlugin) Name() string {
|
||||||
|
return postFilterPluginName
|
||||||
|
}
|
||||||
|
|
||||||
|
var postFilterPlugin = &PostFilterPlugin{}
|
||||||
|
|
||||||
|
// PostFilter is a test function.
|
||||||
|
func (pfp *PostFilterPlugin) PostFilter(_ *framework.PluginContext, pod *v1.Pod, _ []*v1.Node, _ framework.NodeToStatusMap) *framework.Status {
|
||||||
|
pfp.numPostFilterCalled++
|
||||||
|
if pfp.failPostFilter {
|
||||||
|
return framework.NewStatus(framework.Error, fmt.Sprintf("injecting failure for pod %v", pod.Name))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// reset used to reset postfilter plugin.
|
||||||
|
func (pfp *PostFilterPlugin) reset() {
|
||||||
|
pfp.numPostFilterCalled = 0
|
||||||
|
pfp.failPostFilter = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPostFilterPlugin is the factory for post-filter plugin.
|
||||||
|
func NewPostFilterPlugin(_ *runtime.Unknown, _ framework.FrameworkHandle) (framework.Plugin, error) {
|
||||||
|
return postFilterPlugin, nil
|
||||||
|
}
|
||||||
|
|
||||||
var pbdPlugin = &PrebindPlugin{}
|
var pbdPlugin = &PrebindPlugin{}
|
||||||
|
|
||||||
// Name returns name of the plugin.
|
// Name returns name of the plugin.
|
||||||
@ -1447,6 +1482,65 @@ func TestFilterPlugin(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestPostFilterPlugin tests invocation of post-filter plugins.
|
||||||
|
func TestPostFilterPlugin(t *testing.T) {
|
||||||
|
// Create a plugin registry for testing. Register only a post-filter plugin.
|
||||||
|
registry := framework.Registry{postFilterPluginName: NewPostFilterPlugin}
|
||||||
|
|
||||||
|
// Setup initial post-filter plugin for testing.
|
||||||
|
pluginsConfig := &schedulerconfig.Plugins{
|
||||||
|
PostFilter: &schedulerconfig.PluginSet{
|
||||||
|
Enabled: []schedulerconfig.Plugin{
|
||||||
|
{
|
||||||
|
Name: postFilterPluginName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
// Set empty plugin config for testing
|
||||||
|
emptyPluginConfig := []schedulerconfig.PluginConfig{}
|
||||||
|
|
||||||
|
// Create the master and the scheduler with the test plugin set.
|
||||||
|
context := initTestSchedulerWithOptions(t,
|
||||||
|
initTestMaster(t, "post-filter-plugin", nil),
|
||||||
|
false, nil, registry, pluginsConfig, emptyPluginConfig, false, time.Second)
|
||||||
|
defer cleanupTest(t, context)
|
||||||
|
|
||||||
|
cs := context.clientSet
|
||||||
|
// Add a few nodes.
|
||||||
|
_, err := createNodes(cs, "test-node", nil, 2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Cannot create nodes: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, fail := range []bool{false, true} {
|
||||||
|
postFilterPlugin.failPostFilter = fail
|
||||||
|
// Create a best effort pod.
|
||||||
|
pod, err := createPausePod(cs,
|
||||||
|
initPausePod(cs, &pausePodConfig{Name: "test-pod", Namespace: context.ns.Name}))
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Error while creating a test pod: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if fail {
|
||||||
|
if err = waitForPodUnschedulable(cs, pod); err != nil {
|
||||||
|
t.Errorf("Didn't expect the pod to be scheduled. error: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if err = waitForPodToSchedule(cs, pod); err != nil {
|
||||||
|
t.Errorf("Expected the pod to be scheduled. error: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if postFilterPlugin.numPostFilterCalled == 0 {
|
||||||
|
t.Errorf("Expected the post-filter plugin to be called.")
|
||||||
|
}
|
||||||
|
|
||||||
|
postFilterPlugin.reset()
|
||||||
|
cleanupPods(cs, t, []*v1.Pod{pod})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// TestPreemptWithPermitPlugin tests preempt with permit plugins.
|
// TestPreemptWithPermitPlugin tests preempt with permit plugins.
|
||||||
func TestPreemptWithPermitPlugin(t *testing.T) {
|
func TestPreemptWithPermitPlugin(t *testing.T) {
|
||||||
// Create a plugin registry for testing. Register only a permit plugin.
|
// Create a plugin registry for testing. Register only a permit plugin.
|
||||||
|
Loading…
Reference in New Issue
Block a user