- scheduler extenders that are capable of maintaining their own

node cache don't need to get all the information about every
  candidate node. For huge clusters, sending full node information
  of all nodes in the cluster on the wire every time a pod is scheduled
  is expensive. If the scheduler is capable of caching node information
  along with its capabilities, sending node name alone is sufficient.
  These changes provide that optimization in a backward compatible way

- removed the inadvertent signature change of Prioritize() function
- added defensive checks as suggested
-  added annotation specific test case
- updated the comments in the scheduler types
- got rid of apiVersion thats unused
- using *v1.NodeList as suggested
- backing out pod annotation update related changes made in the
  1st commit
- Adjusted the comments in types.go and v1/types.go as suggested
  in the code review
This commit is contained in:
Sarat Kamisetty
2017-01-31 22:57:42 -08:00
parent 17375fc59f
commit dda62ec207
10 changed files with 222 additions and 73 deletions

View File

@@ -61,9 +61,10 @@ type priorityConfig struct {
}
type Extender struct {
name string
predicates []fitPredicate
prioritizers []priorityConfig
name string
predicates []fitPredicate
prioritizers []priorityConfig
nodeCacheCapable bool
}
func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Request) {
@@ -81,12 +82,9 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ
if strings.Contains(req.URL.Path, filter) {
resp := &schedulerapi.ExtenderFilterResult{}
nodes, failedNodes, err := e.Filter(&args.Pod, &args.Nodes)
resp, err := e.Filter(&args)
if err != nil {
resp.Error = err.Error()
} else {
resp.Nodes = *nodes
resp.FailedNodes = failedNodes
}
if err := encoder.Encode(resp); err != nil {
@@ -95,7 +93,7 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ
} else if strings.Contains(req.URL.Path, prioritize) {
// Prioritize errors are ignored. Default k8s priorities or another extender's
// priorities may be applied.
priorities, _ := e.Prioritize(&args.Pod, &args.Nodes)
priorities, _ := e.Prioritize(&args)
if err := encoder.Encode(priorities); err != nil {
t.Fatalf("Failed to encode %+v", priorities)
@@ -105,15 +103,21 @@ func (e *Extender) serveHTTP(t *testing.T, w http.ResponseWriter, req *http.Requ
}
}
func (e *Extender) Filter(pod *v1.Pod, nodes *v1.NodeList) (*v1.NodeList, schedulerapi.FailedNodesMap, error) {
filtered := []v1.Node{}
func (e *Extender) filterUsingNodeCache(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
nodeSlice := make([]string, 0)
failedNodesMap := schedulerapi.FailedNodesMap{}
for _, node := range nodes.Items {
for _, nodeName := range *args.NodeNames {
fits := true
for _, predicate := range e.predicates {
fit, err := predicate(pod, &node)
fit, err := predicate(&args.Pod,
&v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
if err != nil {
return &v1.NodeList{}, schedulerapi.FailedNodesMap{}, err
return &schedulerapi.ExtenderFilterResult{
Nodes: nil,
NodeNames: nil,
FailedNodes: schedulerapi.FailedNodesMap{},
Error: err.Error(),
}, err
}
if !fit {
fits = false
@@ -121,24 +125,78 @@ func (e *Extender) Filter(pod *v1.Pod, nodes *v1.NodeList) (*v1.NodeList, schedu
}
}
if fits {
filtered = append(filtered, node)
nodeSlice = append(nodeSlice, nodeName)
} else {
failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name)
failedNodesMap[nodeName] = fmt.Sprintf("extender failed: %s", e.name)
}
}
return &v1.NodeList{Items: filtered}, failedNodesMap, nil
return &schedulerapi.ExtenderFilterResult{
Nodes: nil,
NodeNames: &nodeSlice,
FailedNodes: failedNodesMap,
}, nil
}
func (e *Extender) Prioritize(pod *v1.Pod, nodes *v1.NodeList) (*schedulerapi.HostPriorityList, error) {
func (e *Extender) Filter(args *schedulerapi.ExtenderArgs) (*schedulerapi.ExtenderFilterResult, error) {
filtered := []v1.Node{}
failedNodesMap := schedulerapi.FailedNodesMap{}
if e.nodeCacheCapable {
return e.filterUsingNodeCache(args)
} else {
for _, node := range args.Nodes.Items {
fits := true
for _, predicate := range e.predicates {
fit, err := predicate(&args.Pod, &node)
if err != nil {
return &schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{},
NodeNames: nil,
FailedNodes: schedulerapi.FailedNodesMap{},
Error: err.Error(),
}, err
}
if !fit {
fits = false
break
}
}
if fits {
filtered = append(filtered, node)
} else {
failedNodesMap[node.Name] = fmt.Sprintf("extender failed: %s", e.name)
}
}
return &schedulerapi.ExtenderFilterResult{
Nodes: &v1.NodeList{Items: filtered},
NodeNames: nil,
FailedNodes: failedNodesMap,
}, nil
}
}
func (e *Extender) Prioritize(args *schedulerapi.ExtenderArgs) (*schedulerapi.HostPriorityList, error) {
result := schedulerapi.HostPriorityList{}
combinedScores := map[string]int{}
var nodes = &v1.NodeList{Items: []v1.Node{}}
if e.nodeCacheCapable {
for _, nodeName := range *args.NodeNames {
nodes.Items = append(nodes.Items, v1.Node{ObjectMeta: metav1.ObjectMeta{Name: nodeName}})
}
} else {
nodes = args.Nodes
}
for _, prioritizer := range e.prioritizers {
weight := prioritizer.weight
if weight == 0 {
continue
}
priorityFunc := prioritizer.function
prioritizedList, err := priorityFunc(pod, nodes)
prioritizedList, err := priorityFunc(&args.Pod, nodes)
if err != nil {
return &schedulerapi.HostPriorityList{}, err
}
@@ -219,6 +277,17 @@ func TestSchedulerExtender(t *testing.T) {
}))
defer es2.Close()
extender3 := &Extender{
name: "extender3",
predicates: []fitPredicate{machine_1_2_3_Predicate},
prioritizers: []priorityConfig{{machine_2_Prioritizer, 5}},
nodeCacheCapable: true,
}
es3 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
extender3.serveHTTP(t, w, req)
}))
defer es3.Close()
policy := schedulerapi.Policy{
ExtenderConfigs: []schedulerapi.ExtenderConfig{
{
@@ -235,6 +304,14 @@ func TestSchedulerExtender(t *testing.T) {
Weight: 4,
EnableHttps: false,
},
{
URLPrefix: es3.URL,
FilterVerb: filter,
PrioritizeVerb: prioritize,
Weight: 10,
EnableHttps: false,
NodeCacheCapable: true,
},
},
}
policy.APIVersion = api.Registry.GroupOrDie(v1.GroupName).GroupVersion.String()
@@ -299,10 +376,11 @@ func DoTestPodScheduling(ns *v1.Namespace, t *testing.T, cs clientset.Interface)
t.Fatalf("Failed to schedule pod: %v", err)
}
if myPod, err := cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{}); err != nil {
myPod, err = cs.Core().Pods(ns.Name).Get(myPod.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get pod: %v", err)
} else if myPod.Spec.NodeName != "machine3" {
t.Fatalf("Failed to schedule using extender, expected machine3, got %v", myPod.Spec.NodeName)
} else if myPod.Spec.NodeName != "machine2" {
t.Fatalf("Failed to schedule using extender, expected machine2, got %v", myPod.Spec.NodeName)
}
t.Logf("Scheduled pod using extenders")
}